You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/01/30 09:11:04 UTC

Slack digest for #general - 2020-01-30

2020-01-29 11:43:26 UTC - Alex Yaroslavsky: @Alex Yaroslavsky has joined the channel
----
2020-01-29 12:01:47 UTC - Alex Yaroslavsky: Hello! The company that I work for is looking for a solution for pub/sub over the internet with thousands (or more) physical devices sending and receiving messages to/from the cloud. We initially looked at MQTT with Kafka, but then found Pulsar which seems very promising. Does anyone has experience with Pulsar having thousands (or more) consumers and producers (with even more topics) over the internet? Are there any special security considerations when exposing Pulsar to the internet (is it even recommended)? Will be happy to provide more details if such are required. Thanks a lot in advance!
----
2020-01-29 12:12:42 UTC - Eugen: @Alex Yaroslavsky No experience -yet. But you should have a look at Pulsar proxy, if you want to expose Pulsar to the internet: <https://pulsar.apache.org/docs/en/concepts-architecture-overview/#pulsar-proxy>
----
2020-01-29 13:23:30 UTC - Joshua Dunham: Hi @Sijie Guo. Thx for the reply. I have seperate containers for pulsar and bookkeeper. In docker-compose this makes a new host for BK which I need to tell pulsar about. Cant find an option like BOOKKEEPER_HOST.
----
2020-01-29 13:29:47 UTC - Antti Kaikkonen: Should effectively once guarantee work on a splitter function like this <https://streaml.io/blog/eda-event-processing-design-patterns-with-pulsar-functions#split> or is it only for single output functions?
----
2020-01-29 14:21:17 UTC - TAREK ALSALEH: @TAREK ALSALEH has joined the channel
----
2020-01-29 14:37:29 UTC - Miroslav Prymek: Hello, I have some problem with Presto in the official 2.5.0 docker image:

```presto&gt; show schemas in pulsar;
Query 20200129_142802_00002_h8d66 failed: Failed to get schemas from pulsar: Cannot cast org.glassfish.jersey.inject.hk2.Hk2InjectionManagerFactory to org.glassfish.jersey.internal.inject.InjectionManagerFactory```
I absolutely don't have a clue what's wrong. Could you please help me?
----
2020-01-29 15:44:50 UTC - Roman Popenov: I see! Thanks, that solved the issue! :thanks:
----
2020-01-29 16:10:05 UTC - Eric Simon: I am trying to create a function that takes in an object (Avro serialization). When the function is triggered, it throws the following error:
```ERROR :java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to```
----
2020-01-29 16:13:43 UTC - Eric Simon: @Mathieu Druart Did you ever figure out this issue? I am running into the same issue with my function :disappointed:
----
2020-01-29 16:34:36 UTC - David Kjerrumgaard: @Antti Kaikkonen Can you elaborate on your question a little?  The pattern is for producing multiple events from a single large data file or source, such as S3.  So the effectively once guarantee would apply to both the splitter function (to ensure the large data source is consumed just once). And the consumers of the output topic would also have the effectively-once processing guarantees as well.
----
2020-01-29 16:56:22 UTC - Antti Kaikkonen: @David Kjerrumgaard I was wondering if each record returned by s.split would appear in the output topic exactly once if the process crashes and is restarted when only half of the individual records have been published.
----
2020-01-29 17:11:24 UTC - David Kjerrumgaard: @Antti Kaikkonen Not as written, but you can easily add some highwater mark logic to handle such a failure.
----
2020-01-29 17:13:14 UTC - Antti Kaikkonen: @David Kjerrumgaard Yeah I was thinking that manually managing the sequenceId of messages should work.
----
2020-01-29 17:13:22 UTC - David Kjerrumgaard: Just store a &lt;source_name, record_num&gt; pair in the Function's state.  Increment the record_num on each record, and if the source is re-processed, then you would just skip the first N records until you catchup to where you left off.
----
2020-01-29 17:14:29 UTC - Antti Kaikkonen: Hmm that should also work, thanks.
----
2020-01-29 17:14:42 UTC - David Kjerrumgaard: no problem.
----
2020-01-29 17:23:18 UTC - Antti Kaikkonen: @David Kjerrumgaard Actually I think that your solution is not foolproof because you have to increment the counter either before or after publishing a record. You can't do both in a transaction as far as I know. I think manually setting the sequenceId of published messages should work because then pulsar de-duplication should be able to remove duplicates.
----
2020-01-29 17:41:05 UTC - David Kjerrumgaard: @Antti Kaikkonen You would check the counter before sending the message. The `context.publish` method returns a future that completes when the framework is done publishing the message, so you can use that to determine if/when to increment the counter.
----
2020-01-29 17:41:48 UTC - David Kjerrumgaard: The deduplication approach would work as well, or a combination of both :smiley:
----
2020-01-29 17:42:32 UTC - Antti Kaikkonen: But what if the process crashes after the future completes but before the counter is incremented? Wouldn't that result in a duplicate in the output topic?
----
2020-01-29 17:44:15 UTC - David Kjerrumgaard: Yes, that is the one gap in the approach. Which is why I am thinking a combination of the two is best, as it would limit the number of duplicates that had to be handled by Pulsar, but still handle this one-off scenario.
100 : Antti Kaikkonen
----
2020-01-29 17:44:50 UTC - David Kjerrumgaard: If a file had 1M records and you failed halfway through, I don't think you want to send the first 500K again  :smiley:
+1 : Antti Kaikkonen
----
2020-01-29 17:48:51 UTC - David Kjerrumgaard: @Alex Yaroslavsky I wouldn't be too concerned with Pulsar's ability to scale out to handle 1000s of clients, and 100s of thousands of topics. However, one security precaution you will want to use in such a scenario is to enable TLS client certificate authentication to prevent unwanted access. This would require the creation and distribution of these certificates to all of the physical devices.
----
2020-01-29 18:07:34 UTC - Mathieu Druart: @Eric Simon nop we did not ... and we were stuck with State management too (cannot activate State API in the function with the k8s deployment)
----
2020-01-29 18:10:04 UTC - Mathieu Druart: @Sijie Guo I will do it
----
2020-01-29 18:46:54 UTC - Sijie Guo: There is no problem to expose pulsar to internet. since Pulsar provides a lot of security features for that purpose. TLS, Authentication, Authorization and Encryption
----
2020-01-29 18:48:05 UTC - Sijie Guo: The current version of presto doesn’t work well in ubuntu and open JDK.
----
2020-01-29 18:48:15 UTC - Sijie Guo: There is an issue track this issue.
----
2020-01-29 18:48:24 UTC - Sijie Guo: We need to upgrade presto to a newer version.
----
2020-01-29 19:27:02 UTC - Miroslav Prymek: Ok, thanks
----
2020-01-29 20:56:56 UTC - Addison Higham: :thinking_face: quick question, I had filled up a bookie, then deleted some topics that had some ledgers on that bookie, I expected to see the disk to free up some space but it didn't happen yet (after like 30 minutes). Does the deletion of ledgers after deleting topics happen periodically through some background job?
----
2020-01-29 21:23:39 UTC - Sam R: @Sam R has joined the channel
----
2020-01-29 21:27:11 UTC - Joe Francis: Ledgers don't physically exist. Multiple ledgers are multiplexed into an entry log on the bookie. Entry logs physically exist, and are deleted when all the ledger entries in it get deleted.
----
2020-01-29 21:33:08 UTC - Joe Francis: GC will clean up the "empty" entry logs
----
2020-01-29 21:39:31 UTC - Addison Higham: is there a 1:1 relation between topics and entry logs?
----
2020-01-29 21:45:15 UTC - Joe Francis: No.  Its all multiplexed everywhere - kind of like turtles all the way down.
----
2020-01-29 21:45:59 UTC - Addison Higham: gotcha, okay, so in the event I do have a lot of disk bloat, really, I just need to scale up disk, then eventually I can get back the space as offload happens and GC occurs
----
2020-01-29 21:46:40 UTC - Addison Higham: We offload pretty much everything over some period of time, but just haven't been seeing disk released as expected, which is fine, just need to keep a bit more disk around than I initially expected
----
2020-01-29 21:47:42 UTC - Joe Francis: A topic is at any given time written to one ledger. A ledger is , at any given time, written to an ensemble of bookie hosts.  A given bookie at any given time write all its data (from all the ledgers that its writing into )  into one entry log.
----
2020-01-29 21:48:39 UTC - Joe Francis: There are two settings you can control for compaction, which might help you
----
2020-01-29 21:49:34 UTC - Joe Francis: Compaction controls allow you to reclaim space at the cost of I/O
----
2020-01-29 21:52:48 UTC - Joe Francis: You can set the threshold at which an entry log is  compacted.
----
2020-01-29 21:53:18 UTC - Joe Francis: Refer these <https://bookkeeper.apache.org/docs/4.10.0/reference/config/#entry-log-compaction-settings>
----
2020-01-29 22:01:22 UTC - Addison Higham: many thanks! that is super helpful, I honestly haven't had to dig a ton into BK details yet so that is a good jumpoff point :slightly_smiling_face:
----
2020-01-29 22:55:39 UTC - Konstantinos Papalias: @Konstantinos Papalias has joined the channel
----
2020-01-30 02:10:33 UTC - Roman Popenov: What might be causing the following error inside the broker:
```02:09:36.145 [bookkeeper-io-14-1] WARN  org.apache.bookkeeper.proto.PerChannelBookieClient - Exception caught on:[id: 0xb498631d, L:/10.0.1.117:56098 ! R:/10.0.1.190:3181] cause:
<http://java.io|java.io>.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_232]
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_232]
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_232]
	at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[?:1.8.0_232]
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:1.8.0_232]
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408) ~[io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:694) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [io.netty-netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.43.Final.jar:4.1.43.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
02:09:36.145 [bookkeeper-io-14-1] WARN  org.apache.bookkeeper.proto.PerChannelBookieClient - Exception caught on:[id: 0xb498631d, L:/10.0.1.117:56098 ! R:/10.0.1.190:3181] cause:
<http://java.io|java.io>.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_232]
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_232]
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_232]
	at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[?:1.8.0_232]
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:1.8.0_232]
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408) ~[io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:694) [io.netty-netty-transport-4.1.43.Final.jar:4.1.43.Final]```
----
2020-01-30 02:12:41 UTC - Roman Popenov: It also appears that the functions doesn’t read from a topic. I am running it in a kubernetes pod
----
2020-01-30 02:13:04 UTC - Roman Popenov: the function appears to be running and the logs are clean, but nothing is happening
----
2020-01-30 02:19:20 UTC - Eugen: Question about striping in BookKeeper (and I assume by extension, in Pulsar). sijieg states the following on <https://streaml.io/blog/why-bookkeeper-part-2#single-stream-throughput>

&gt; BookKeeper also provides the ability to scale the throughput for a single stream by increasing the size of ensemble (an ensemble is a subset of bookies used for storing a given ledger or stream) and striping the data across the bookies.
But afaiui, a single topic (stream) has only one log segment that it writes to at any point in time, and the log segment is replicated to a number of bookies (the write quorum), and which bookies out of the ensemble are used only changes when a new log segment is started. So how then can there be striping?  I understand striping as `different log entries being written to different sets of bookies to increase performance`.
----
2020-01-30 03:19:47 UTC - Joe Francis: You can set ensemble to be higher than write quorum(E &gt; W) . Then  striping (yes, just as you define it)  will occur.  Read ensemble(E) vs Write quorums(W) starting here <https://bookkeeper.apache.org/docs/4.10.0/development/protocol/#ensembles>
----
2020-01-30 03:32:21 UTC - Eugen: @Joe Francis So striping is possible because of the concept of fragments - which are finer grained than ledgers, and every fragment has its own set of bookies. Is that how it works?
----
2020-01-30 03:37:56 UTC - Joe Francis: Not really. A Fragment is associated with an ensemble. When the ensemble changes, there is a new fragment.  Striping is within an ensemble.  There is an example for striping provided in the "Write Quorum"  section of the doc I linked above.
----
2020-01-30 03:48:51 UTC - Eugen: @Joe Francis I see this now. So the set of bookies for every entry can be different. Interesting, I had thought that all instances of a ledger (or fragment, once I learned about them) on the bookies involved would the exact same data. Apparently not. So now I understand how striping works. I'm surprised though that non-identical ledgers does not lead to maintenance hell down the road... (which entry is stored on which bookie?)
----
2020-01-30 03:51:39 UTC - Eugen: Different question: What is the difference between a major compaction and a minor compaction? The official BookKeeper docs at <https://bookkeeper.apache.org/docs/latest/getting-started/concepts/> seem to say there is no difference in what they actually do, i.e. how they compact, the only difference is the timing when they are triggered.
&gt; There are two kinds of compaction running with different frequency: minor compaction and major compaction. The differences between minor compaction and major compaction lies in their threshold value and compaction interval.
----
2020-01-30 03:54:08 UTC - Joe Francis: There is a  only a fixed number of quorums possible for a given E/W, and so this set is defined . And the entries are deterministically written to this  set. (either hashed or round robin, have to look at the code, dont recollect now ) Given an entry id and ensemble,  its trivial to determine which bookies have the entry
----
2020-01-30 03:54:52 UTC - Eugen: @Joe Francis I see, makes sense - clever!
----
2020-01-30 04:05:41 UTC - Joe Francis: The major and minor is exactly what it implies from it defaults settings ( 0.8 and 0.2 utilization factor of entry log) The idea is that you set minor to run more frequent, but it moves very little data because the  UF has to fall very low for an entry log to be considered for minor GC and so that log has very little dat to be moved.  You set major to run less frequent, because it will move much more data.
----
2020-01-30 04:57:05 UTC - Eugen: Then my understanding of the use of that utilization factor was wrong - will read up on it! Thanks
----
2020-01-30 05:11:00 UTC - Hemant Dindi: @Hemant Dindi has joined the channel
----
2020-01-30 05:12:41 UTC - Antti Kaikkonen: Is it possible to concurrently process multiple messages with pulsar functions while retaining ordering? Example use case: transforming HTTP URLs to response contents which can be slow without concurrent HTTP requests. As an alternative I'm thinking of implementing a source connector which includes a pulsar consumer that puts CompletableFutures to a BlockingQueue while the reader takes them from the queue.
----
2020-01-30 06:37:34 UTC - Moacy Barros: @Moacy Barros has joined the channel
----