You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/12/24 09:11:04 UTC

Slack digest for #general - 2019-12-24

2019-12-23 10:04:22 UTC - Markus Binsteiner: @Markus Binsteiner has joined the channel
----
2019-12-23 12:50:47 UTC - Vivek Prasad: @Vivek Prasad has joined the channel
----
2019-12-23 13:41:54 UTC - Himanshu Singh: @Himanshu Singh has joined the channel
----
2019-12-23 14:52:27 UTC - Roman Popenov: I think we need to write our own connectors to do the job. This is what I will be focusing today
----
2019-12-23 14:53:00 UTC - Roman Popenov: I need a sink and source connectors to Pulsar
----
2019-12-23 15:23:38 UTC - Mathieu Druart: @Mathieu Druart has joined the channel
----
2019-12-23 15:34:32 UTC - Mathieu Druart: Hi all ! I'm figuring how I can use the Pulsar file connector to parse log files if I have multiple file connector instances deployed on my cluster. All the brokers are behind the proxy, so from the outside I cannot really access a specific broker/worker pod to drop a file in a particular folder. Any idea if there is a way to achieve this in a clean way ? Or do I misunderstand something ? Thanks in advance !
----
2019-12-23 16:12:03 UTC - Guilherme Perinazzo: @Guilherme Perinazzo has joined the channel
----
2019-12-23 20:04:19 UTC - ivan: @ivan has joined the channel
----
2019-12-23 21:17:09 UTC - Anthony Wu: @Anthony Wu has joined the channel
----
2019-12-23 22:41:01 UTC - Roman Popenov: My pulsar functions keep on restarting in a loop for some reason:
```root@peddling-chicken-pulsar-broker-579595f6cb-kdd9t:/pulsar# /pulsar/bin/pulsar-admin functions getstatus --namespace test-ns --tenant forcepoint-tenant --name PulsarSysmonETL
{
  "numInstances" : 1,
  "numRunning" : 0,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : false,
      "error" : "",
      "numRestarts" : 3,
      "numReceived" : 0,
      "numSuccessfullyProcessed" : 0,
      "numUserExceptions" : 0,
      "latestUserExceptions" : [ ],
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "averageLatency" : 0.0,
      "lastInvocationTime" : 0,
      "workerId" : "c-peddling-chicken-pulsar-fw-10.0.0.188-8080"
    }
  } ]
}```
Initially I had:
```root@peddling-chicken-pulsar-broker-579595f6cb-kdd9t:/pulsar# /pulsar/bin/pulsar-admin functions getstatus --namespace test-ns --tenant forcepoint-tenant --name PulsarSysmonETL
{
  "numInstances" : 1,
  "numRunning" : 0,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : false,
      "error" : "UNAVAILABLE: io exception",
      "numRestarts" : 0,
      "numReceived" : 0,
      "numSuccessfullyProcessed" : 0,
      "numUserExceptions" : 0,
      "latestUserExceptions" : [ ],
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "averageLatency" : 0.0,
      "lastInvocationTime" : 0,
      "workerId" : "c-peddling-chicken-pulsar-fw-10.0.0.188-8080"
    }
  } ]
}```
----
2019-12-23 22:42:22 UTC - Jerry Peng: @roman how are you running functions?
----
2019-12-23 22:42:37 UTC - Jerry Peng: As threads, process, or kubernetes?
----
2019-12-23 22:43:03 UTC - Roman Popenov: Inside kubernetes pod:
```/pulsar/bin/pulsar-admin functions create --jar /tmp/Pulsar-Demo-SKO-1.0-SNAPSHOT-uberjar.jar --classname com.forcepoint.gemini.pulsar.functions.PulsarSysmonETL --namespace test-ns --tenant forcepoint-tenant --inputs <persistent://forcepoint-tenant/test-ns/demo-topic-sysmon> --output <persistent://forcepoint-tenant/test-ns/demo-topic-sysmon-cim>```
----
2019-12-23 22:43:40 UTC - Jerry Peng: can you check log for the pod to see if there are any errors?
----
2019-12-23 22:46:12 UTC - Jerry Peng: @Mathieu Druart usually people with have the files served from some file server that the connectors can access.  The connectors will connect to the file server and pull files from the server to be ingested into pulsar
----
2019-12-23 22:46:49 UTC - Roman Popenov: ```22:32:18.601 [DL-io-0] ERROR org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl - Unable to allocate memory
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 117440519, max: 134217728)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:655) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:610) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) [org.apache.bookkeeper-bookkeeper-common-allocator-4.9.2.jar:4.9.2]
	at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) [org.apache.bookkeeper-bookkeeper-common-allocator-4.9.2.jar:4.9.2]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer0(AbstractEpollChannel.java:323) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:309) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:292) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:533) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:881) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1391) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at org.apache.bookkeeper.util.ByteBufList$Encoder.write(ByteBufList.java:328) [org.apache.bookkeeper-bookkeeper-server-4.9.2.jar:4.9.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageEncoder.writePromiseCombiner(MessageToMessageEncoder.java:137) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:119) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at org.apache.bookkeeper.proto.BookieProtoEncoding$RequestEncoder.write(BookieProtoEncoding.java:391) [org.apache.bookkeeper-bookkeeper-server-4.9.2.jar:4.9.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at org.apache.bookkeeper.proto.AuthHandler$ClientSideHandler.write(AuthHandler.java:336) [org.apache.bookkeeper-bookkeeper-server-4.9.2.jar:4.9.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:38) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1104) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1151) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1075) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]```
----
2019-12-23 22:47:06 UTC - Roman Popenov: :thinking_face:
----
2019-12-23 22:47:19 UTC - Roman Popenov: I would assume that I should try to run with --cpu and --ram params?
----
2019-12-23 22:47:32 UTC - Roman Popenov: and allocate less memory for the functions?
----
2019-12-23 22:48:14 UTC - Jerry Peng: it seems you need to allocate more memory
----
2019-12-23 22:50:08 UTC - Jerry Peng: by default 1GB allocated per pod / instance of a function
----
2019-12-23 22:50:54 UTC - Roman Popenov: Alright
----
2019-12-23 22:51:01 UTC - Roman Popenov: Let me try and play with these parameters
----
2019-12-23 22:52:05 UTC - Roman Popenov: Is it safe to assume that --cpu can take in fractional numbers and --ram and --disk are in bytes?
----
2019-12-23 22:52:35 UTC - Jerry Peng: yes
----
2019-12-23 22:53:59 UTC - Roman Popenov: Thanks
----
2019-12-23 23:05:41 UTC - Roman Popenov: I have created a very small function:
```23:04:34.963 [pulsar-external-listener-48-1] INFO  org.apache.pulsar.functions.worker.FunctionAssignmentTailer - Received assignment update: instance {
  functionMetaData {
    functionDetails {
      tenant: "forcepoint-tenant"
      namespace: "test-ns"
      name: "PulsarSysmonETL"
      className: "com.forcepoint.gemini.pulsar.functions.PulsarSysmonETL"
      autoAck: true
      parallelism: 1
      source {
        typeClassName: "java.lang.String"
        inputSpecs {
          key: "<persistent://forcepoint-tenant/test-ns/demo-topic-sysmon>"
          value {
          }
        }
        cleanupSubscription: true
      }
      sink {
        topic: "<persistent://forcepoint-tenant/test-ns/demo-topic-sysmon-cim>"
        typeClassName: "java.lang.String"
      }
      resources {
        cpu: 0.1
        ram: 8388608
        disk: 104857600
      }
      componentType: FUNCTION
    }
    packageLocation {
      packagePath: "forcepoint-tenant/test-ns/PulsarSysmonETL/529f9d8e-bbec-4b1b-a243-b8be2597b2bf-Pulsar-Demo-SKO-1.0-SNAPSHOT-uberjar.jar"
      originalFileName: "Pulsar-Demo-SKO-1.0-SNAPSHOT-uberjar.jar"
    }
    createTime: 1577142270955
  }
}
workerId: "c-peddling-chicken-pulsar-fw-10.0.0.188-8080"```
----
2019-12-23 23:05:58 UTC - Roman Popenov: And I am still seeing reboots :confused:
----
2019-12-23 23:07:14 UTC - Jerry Peng: 8388608 = 8MB
The JVM will need more than that amount of memory to operate
----
2019-12-23 23:07:43 UTC - Roman Popenov: Hm… looks like I would need to create bigger brokers with bigger memory
----
2019-12-23 23:08:27 UTC - Jerry Peng: wait are you deploying functions as threads within brokers?
----
2019-12-23 23:08:51 UTC - Roman Popenov: I have uploaded the uberjar to broker pod
----
2019-12-23 23:09:11 UTC - Roman Popenov: and I am trying to run `/pulsar/bin/pulsar-admin functions create --jar /tmp/Pulsar-Demo-SKO-1.0-SNAPSHOT-uberjar.jar --classname com.forcepoint.gemini.pulsar.functions.PulsarSysmonETL --namespace test-ns --tenant forcepoint-tenant --inputs <persistent://forcepoint-tenant/test-ns/demo-topic-sysmon> --output <persistent://forcepoint-tenant/test-ns/demo-topic-sysmon-cim> --ram 8388608 --disk 104857600 --cpu 0.1`
----
2019-12-23 23:09:31 UTC - Roman Popenov: I guess that isn’t how I register it as a thread
----
2019-12-23 23:09:44 UTC - Jerry Peng: if kubernetes runtime is configured correctly for functions, a statefulset is submitted for each function and each function instances will run in a pod
----
2019-12-23 23:09:59 UTC - Roman Popenov: No, they are not running as pods
----
2019-12-23 23:10:09 UTC - Jerry Peng: oh
----
2019-12-23 23:10:21 UTC - Jerry Peng: that is the recommended way to run functions on kubernetes
----
2019-12-23 23:10:42 UTC - Jerry Peng: <https://pulsar.apache.org/docs/en/functions-runtime/#configure-kubernetes-runtime>
----
2019-12-23 23:11:16 UTC - Roman Popenov: Alright, I shall try this, thank you
----
2019-12-23 23:13:11 UTC - Roman Popenov: Out of curiosity, how would I run a threaded mode inside a worker pod in Kubernetes?
----
2019-12-23 23:14:05 UTC - Jerry Peng: you would have to deploy functions workers as a statefulset
----
2019-12-23 23:14:22 UTC - Jerry Peng: create an service endpoint for them
----
2019-12-23 23:14:31 UTC - Jerry Peng: and submit functions to that endpoint
----
2019-12-23 23:16:22 UTC - Roman Popenov: and just a process inside a broker pod?
----
2019-12-23 23:17:51 UTC - Roman Popenov: And what about a thread inside a broker pod?
----
2019-12-23 23:18:52 UTC - Jerry Peng: take a look this page:
<https://pulsar.apache.org/docs/en/functions-runtime/>
----
2019-12-23 23:19:06 UTC - Jerry Peng: it explains the various ways functions can be deployed and run as
----
2019-12-23 23:30:02 UTC - Mathieu Druart: thank you @Jerry Peng that's what I was thinking of, but first I was trying to use the builtin pulsar file connector that is scanning a local folder to find files to process. I was just wondering if there was a clean way to use this builtin connector.
----
2019-12-23 23:30:59 UTC - Jerry Peng: If you can mount that a folder with the files as a volume in the pod that might work
----
2019-12-23 23:35:04 UTC - Mathieu Druart: I was thinking of that too, but I watched at the connector's code and unfortunately I think that if I mount the same network folder (with NFS) on each pod, some files could be processed multiple times (by different nodes).
----
2019-12-23 23:45:50 UTC - Jerry Peng: can you  split the files into distinct folders and run several distinct file sources each reading only from a specific folder?
----
2019-12-24 00:17:20 UTC - Steven Duncan: @Steven Duncan has joined the channel
----
2019-12-24 00:18:39 UTC - Mathieu Druart: I would like to do that, but I dont know how to achieve this with the file connector, each instance of the connector have the same parameters and from the outside (where I put the files) I have no knowledge of the cluster's structure (and I dont want to have to know how many pods are deployed and identity each worker individually).
----
2019-12-24 00:22:40 UTC - Roman Popenov: How do I get the schema for a topic using the admin CLI?
I have tried the following:
```root@peddling-chicken-pulsar-broker-579595f6cb-kdd9t:/pulsar# bin/pulsar-admin topics list forcepoint-tenant/test-ns
<persistent://forcepoint-tenant/test-ns/demo-topic-sysmon>
root@peddling-chicken-pulsar-broker-579595f6cb-kdd9t:/pulsar# bin/pulsar-admin schemas get demo-topic-sysmon
HTTP 404 Not Found
Reason: HTTP 404 Not Found
root@peddling-chicken-pulsar-broker-579595f6cb-kdd9t:/pulsar# bin/pulsar-admin schemas get <persistent://forcepoint-tenant/test-ns/demo-topic-sysmon>
HTTP 404 Not Found
Reason: HTTP 404 Not Found```
----
2019-12-24 00:23:12 UTC - Roman Popenov: I have also tried `/admin/v2/schemas/forcepoint-tenant/test-ns/demo-topic-sysmon/schema` and get
```&lt;html&gt;

&lt;head&gt;
	&lt;meta http-equiv="Content-Type" content="text/html;charset=utf-8" /&gt;
	&lt;title&gt;Error 404 Not Found&lt;/title&gt;
&lt;/head&gt;

&lt;body&gt;
	&lt;h2&gt;HTTP ERROR 404&lt;/h2&gt;
	&lt;p&gt;Problem accessing /admin/v2/schemas/forcepoint-tenant/test-ns/demo-topic-sysmon/schema. Reason:
		&lt;pre&gt;    Not Found&lt;/pre&gt;
	&lt;/p&gt;
	&lt;hr&gt;&lt;a href="<http://eclipse.org/jetty>"&gt;Powered by Jetty:// 9.4.20.v20190813&lt;/a&gt;
	&lt;hr /&gt;

&lt;/body&gt;

&lt;/html&gt;```
----
2019-12-24 00:30:31 UTC - tuteng: Your  command seems correct, but does your topic have a schema?
----
2019-12-24 00:30:50 UTC - Roman Popenov: Does it autogenerate a schema?
----
2019-12-24 00:32:02 UTC - Roman Popenov: I didn’t register a schema, I simply sent a byte string using Python:
```producer.send(b''.join(event_list))```

----
2019-12-24 00:32:26 UTC - Roman Popenov: I am trying to read back the message using Pulsar functions in java now
----
2019-12-24 00:33:18 UTC - tuteng: Byte type does not seem to generate schema
----
2019-12-24 00:33:29 UTC - Roman Popenov: I don’t know
----
2019-12-24 00:33:59 UTC - tuteng: You can try json or avro
----
2019-12-24 00:43:56 UTC - Roman Popenov: Yeah, it’s bytes[]
----
2019-12-24 00:44:04 UTC - Roman Popenov: but it doesn’t show it unfortunately
----
2019-12-24 07:06:29 UTC - Mike: @Mike has joined the channel
----
2019-12-24 07:59:06 UTC - nbluedx: @nbluedx has joined the channel
----