You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/04/09 09:11:03 UTC

Slack digest for #general - 2019-04-09

2019-04-08 13:24:50 UTC - Mark Marijnissen: How do I enable websockets on the Pulsar Proxy?
----
2019-04-08 16:00:01 UTC - Gene Fojtik: @Gene Fojtik has joined the channel
----
2019-04-08 16:11:46 UTC - Devin G. Bost: I'm looking for ideas of how to create graceful continuous deployments at-scale (with downtime &lt; 300 ms). I'm considering deploying to a new tenant, renaming the old tenant, and then renaming the new tenant to the old tenant name. However, I'm not sure if that will allow me to gracefully roll over existing consumers and producers.
Are there any thoughts on this approach or suggestions of how to create a better continuous deployment process?
----
2019-04-08 16:16:05 UTC - Devin G. Bost: We also can't have data loss, so that's another issue that must be considered.
----
2019-04-08 16:22:49 UTC - Kenan Dalley: Hi.  I've run into an issue with Pulsar Functions and the Jackson JSON library.  My function is defined as Function&lt;MyModel,MyModel&gt;, where MyModel has a String id, FruitEnum fruitType (Apple, Orange, etc) and a long stateCounter.  My function fills in the id &amp; counter and my Producer sends in the fruitType.

When I run with my own Producer, which sends out a MyModel, it runs fine.  But when I attempted to use "bin/pulsar-client functions produce -m "{fruitType=\"Orange\"}", Jackson blew up the function with the message "JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name."

I understand what Jackson wants here, but the underlying issue is that, because of this error and that the offset wasn't committed, my function was pushed into an infinite loop of restarts on the server because it kept trying to read that message over and over when it was restarted.

3 questions
1. Is there a way to configure Jackson to be looser with it's parsing for Functions?
2. Is there a way to manually, or automatically, change the commit the f(n) is looking at on the fly?
3. Is there a way to intercept the message prior to the Jackson call to do message type validation so that this can be prevented?

This situation is definitely not good for an Enterprise-level application.
----
2019-04-08 16:42:08 UTC - David Kjerrumgaard: @Kenan Dalley For #1, you can configure the Jackson parser to allow unquoted field names. `mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);`
----
2019-04-08 16:46:51 UTC - David Kjerrumgaard: @Kenan Dalley for #2, you can add the following property to the function deployment which will cause the messages to be acknowledged automatically upon processing, rather than requiring you to perform the ack in the code.  `"autoAck": true,`
----
2019-04-08 17:05:04 UTC - Kenan Dalley: Adding "--auto-ack true" to the function's create/update statement didn't fix anything.  It's still going into an infinite loop.
----
2019-04-08 17:12:15 UTC - Matteo Merli: @Mark Marijnissen Currently the websocket service is not integrated in the Pulsar proxy. You have to start it on its own with `pulsar websocket`
----
2019-04-08 17:13:15 UTC - Matteo Merli: @Devin G. Bost You mean Pulsar deployments or your application deployments?
----
2019-04-08 17:21:44 UTC - Kenan Dalley: @David Kjerrumgaard Where would I go to configure the mapper for either that function or all functions?  Since the function class itself is after the conversion, that would be too late and I don't know anywhere else that makes sense.
----
2019-04-08 17:22:38 UTC - Mandi Goddard: @Jon Bock I am trying to find some way to communicate with students who don't read their emails. Any suggestions?
----
2019-04-08 17:34:17 UTC - Devin G. Bost: @Matteo Merli
Great question. Thanks for asking. I mean application deployments (e.g. a set of Pulsar functions, sinks, sources, topics, and namespaces).
----
2019-04-08 17:37:29 UTC - Guy Feldman: @Mandi Goddard sounds like more of a job for an email sending service like Mailjet or easysendy
----
2019-04-08 17:37:31 UTC - Matteo Merli: Ok, in any case (even if it were for a Pulsar deployement) there would be no downtime or data loss.

For functions, you will issue an “update” of a running function and that will trigger a rolling restart of the instances. Across multiple instances there will still be some instance running.

Regarding data loss, functions use a consumer which has a subscription associated. If the function doesn’t process the messages, the messages won’t be acknowledged and therefore the broker will keep them.
----
2019-04-08 17:40:14 UTC - Devin G. Bost: Thanks for the guidance.
----
2019-04-08 17:43:16 UTC - Sanjeev Kulkarni: @Kenan Dalley There is a maxMessageRetries parameter in the FunctionConfig. By default its -1(which is forever), but you can send in a finite value which should attempt the framework send the message at max those many times before giving up
----
2019-04-08 17:43:40 UTC - Devin G. Bost: What would be involved if I wanted to create something like an "upsert" operation that would create a component (if it doesn't exist) or update a component (if it does)?
----
2019-04-08 17:48:35 UTC - Kenan Dalley: @Sanjeev Kulkarni I tried that too (set to 1) and I tried the dead-letter queue setup as well and nothing has worked.  It continues to fail the function, no ack and restarts the function an infinite number of times.  It's as though all of these other config settings are activated after the failure occurs and are made irrelevant.
----
2019-04-08 17:50:01 UTC - Sanjeev Kulkarni: when/where is the error occuring?
----
2019-04-08 17:50:04 UTC - Sanjeev Kulkarni: is there a stack trace
----
2019-04-08 17:56:36 UTC - Matteo Merli: Not sure :slightly_smiling_face:
----
2019-04-08 17:57:05 UTC - Matteo Merli: You can try “functions create” and if it fails fallback to update.. I guess it’s not ideal..
----
2019-04-08 17:58:33 UTC - Jerry Peng: i see
----
2019-04-08 17:58:50 UTC - Kenan Dalley: Not easy on my phone. :slightly_smiling_face:

Here's what the pulsar function log has:

17:01:23.343 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
17:01:23.413 [public/default/FnFruit-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/FnFruit:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
 at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
        at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:84) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:233) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:458) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:243) [java-instance.jar:2.3.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
 at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:1988) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1639) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:725) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:82) ~[java-instance.jar:2.3.0]
        ... 5 more
17:01:23.420 [public/default/FnFruit-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
----
2019-04-08 17:58:56 UTC - Jerry Peng: and interesting logs printed in the broker log related to that topic?
----
2019-04-08 17:59:22 UTC - Jerry Peng: partitioned topic or not partitioned topic?
----
2019-04-08 18:19:28 UTC - Sanjeev Kulkarni: so this is the pulsar’s schema trying to intepret the json and failing. For now, can you interprit the messages as bytes[] and do the conversion yourself/
----
2019-04-08 18:19:53 UTC - Sanjeev Kulkarni: essentially your function signature will change to &lt;byte[], MyModel&gt;
----
2019-04-08 18:20:15 UTC - Sanjeev Kulkarni: and inside your function, you can then try to interpret loosely the bytes to json object and then do the manipulation
----
2019-04-08 18:24:57 UTC - Kenan Dalley: That seems like a valid approach, but definitely not ideal.   I'll try it out.
----
2019-04-08 18:43:31 UTC - Sanjeev Kulkarni: @Kenan Dalley agreed. <https://github.com/apache/pulsar/pull/4004> should fix the behavior for next release
----
2019-04-08 18:49:24 UTC - Devin G. Bost: I'm adding an `upsert` method to Pulsar-Admin to enable conditional creation/update (where it creates a component if it doesn't exist and updates the component if it does exist). This method will make rolling deployments easier because we can just call this single method on every component in our project tree during our deployments.

In my fork, I added `upsertFunction` to
`pulsar-functions/worker/src/main/java/org.apache.pulsar.functions.worker/rest/api/ComponentImpl.java` and added upsert methods to:
`pulsar-broker/src/main/java/org.apache.pulsar/broker/admin/impl/FunctionsBase.java`, `SinkBase.java`, and `SourceBase.java`.

Adding upsert to NamespacesBase and TenantsBase looks more involved, so I haven't done that yet.

Are there any other points that I need to include before I submit a PR with these changes?
----
2019-04-08 18:50:02 UTC - Devin G. Bost: At some point, I’d also like to implement a bulk-upsert method where I can pass in a manifest file and trigger upserts in parallel for everything in the manifest file.
----
2019-04-08 18:50:16 UTC - Devin G. Bost: The purpose of these changes is to simplify (and speed up) continuous deployment.
+1 : David Kjerrumgaard, Karthik Ramasamy
----
2019-04-08 19:08:06 UTC - John Crawford: non-partitioned
----
2019-04-08 19:08:10 UTC - John Crawford: seeing this log again:
----
2019-04-08 19:08:36 UTC - John Crawford: ```
19:02:46.150 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets]-4372 read entry timeout for 0-0 after 120 sec
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN  org.apache.bookkeeper.mledger.impl.OpReadEntry - [02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets][etl-production] read failed from ledger at position:4372:0 : Bookie operation timeout
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets> / etl-production] Error reading entries at 4372:0 : Bookie operation timeout, Read Type Normal - Retrying to read in 58.832 seconds
```
----
2019-04-08 19:08:37 UTC - Devin G. Bost: Alternatively, if there are objections to the `upsert` approach, we could still build a tree of components from a manifest file with a `bulkDeploy` method, compute differences between the manifest tree and the tree of components currently existing in Pulsar, and then conditionally call create, update, and/or delete methods to bring Pulsar in-sync with the manifest file.

Feedback on preferences would be helpful.
----
2019-04-08 19:09:59 UTC - John Crawford: the further down the log (58 seconds later):
----
2019-04-08 19:10:07 UTC - John Crawford: ```
19:03:44.983 [pulsar-io-23-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets> / etl-production] Retrying read operation
19:03:44.983 [pulsar-io-23-3] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught 
java.lang.ArrayIndexOutOfBoundsException: null
```
----
2019-04-08 19:10:34 UTC - John Crawford: don’t see anything but GC logs in bookkeeper logs
----
2019-04-08 19:14:04 UTC - Grant Wu: @Kenan Dalley Why can’t you quote your field names?
----
2019-04-08 19:14:24 UTC - Grant Wu: Where is the input coming from that is so blatantly non-compliant with the spec
----
2019-04-08 19:14:46 UTC - Grant Wu: I was considering using the JSON schema in the future, I would prefer for this to either be tunable or not changed
----
2019-04-08 19:16:04 UTC - Grant Wu: if we allow non-quoted field names, we immediately break the Python JSON parser as well:
```
&gt;&gt;&gt; import json
&gt;&gt;&gt; json.loads('{foo: "bar"}')
Traceback (most recent call last):
  File "&lt;stdin&gt;", line 1, in &lt;module&gt;
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
```
----
2019-04-08 19:22:13 UTC - Grant Wu: To clarify my objection: if we allow this, then consumers assuming that the input they get is valid JSON will all now need to be able to handle this non-compliant JSON.  So, for example, the standard JSON parsing library in Python would no longer suffice; we would need to pull in a third party dependency.  <https://stackoverflow.com/questions/39491420/python-jsonexpecting-property-name-enclosed-in-double-quotes>
----
2019-04-08 19:23:42 UTC - Kenan Dalley: @Grant Wu I'd prefer it to be tunable because  I cannot guarantee that the data that comes in will be 100% compliant with the spec.

Also, I personally don't agree that field names not being surrounded by double quotes is something that should completely fail the object mapper since it's a very widely used pattern.  Otherwise, it wouldn't be an option to override.
----
2019-04-08 19:28:58 UTC - Grant Wu: We should probably move this discussion to the PR
+1 : Devin G. Bost
----
2019-04-08 19:30:06 UTC - Ryan Samo: Hey guys,
When using Pulsar SQL (presto) I do not see an obvious way to pass in the admin certificates. Since we are using TLS, do you have any idea what config the tlsCert, tlsKey, and rootCA would reside for presto?
----
2019-04-08 19:30:36 UTC - Ryan Samo: Get 401 unauthorized if we don’t provide the certs somehow 
----
2019-04-08 19:32:40 UTC - Grant Wu: I put a comment there
----
2019-04-08 19:44:05 UTC - Grant Wu: I’m not sure what the best approach is here, but here are my thoughts:
1. This would be welcome, my deployment script for Pulsar Functions currently is quite ugly, it greps the output of pulsar-admin functions get.
2. A manifest approach kind of feels somewhat heavyweight, and it might overlap with native k8s tooling (for those of us running Pulsar in k8s).  Pulsar Functions are the exception here because they’re not materialized as k8s resources anywhere (possible future avenue of work)
3. Possible alternative name to “upsert” - “put” - better matches the HTTP verbs that pulsar-admin already uses
----
2019-04-08 19:44:16 UTC - Kenan Dalley: Dittos.   :blush:
----
2019-04-08 20:46:50 UTC - Devin G. Bost: Thanks for the feedback.
----
2019-04-08 20:51:34 UTC - Devin G. Bost: If I were to create a PR, how long do you think it would take before it would end up in a release?
----
2019-04-08 20:52:47 UTC - Grant Wu: I don’t know exactly - not a maintainer - but <https://github.com/apache/pulsar/releases> can give you some sense of how frequently releases happen
----
2019-04-08 21:50:39 UTC - Devin G. Bost: Is there going to be a Pulsar conference?
----
2019-04-08 21:52:26 UTC - Sanjeev Kulkarni: Hopefully soon
+1 : Devin G. Bost, Ali Ahmed, Ezequiel Lovelle, Karthik Ramasamy, Yuvaraj Loganathan, Shivji Kumar Jha
bananadance : Yuvaraj Loganathan
----
2019-04-08 23:34:53 UTC - Devin G. Bost: Regarding the upsert function, I made the changes in my fork and got the build to succeed. I think I updated all of the touch-points correctly (though I may be missing something). When I try to call it from the admin API, I get null output with:

`Reason: javax.ws.rs.ProcessingException: Java heap space`

I tried updating my pulsar_env.sh line to:
`PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx8g -XX:MaxDirectMemorySize=12g"}`
but I got the same issue. Any ideas?
----
2019-04-08 23:45:59 UTC - Ali Ahmed: @Devin G. Bost you have to change pulsar_tools_env.sh
----
2019-04-08 23:46:18 UTC - Devin G. Bost: Oh, thanks! I'll try that!
----
2019-04-08 23:52:38 UTC - Sijie Guo: @young sorry that I forgot to reply to you yesterday. from the exception, it seems that you are using both pulsar-client and pulsar-client-admin? what dependencies do you include for your java program?
----
2019-04-08 23:54:24 UTC - Devin G. Bost: It worked! Thanks!
----
2019-04-09 02:00:31 UTC - young: @Sijie Guo thanks for reply. I only import org.apache.pulsar.client.api.Producer  and org.apache.pulsar.client.api.PulsarClientException , no pulsar-client-admin.
----
2019-04-09 02:07:19 UTC - Sijie Guo: are you using pulsar-client or pulsar-client-original? what version are you using?
----
2019-04-09 02:08:04 UTC - young: further more,when I run a pulsar cluster by standalone on my own machine, the java code pass  the test. but the same code running on the server machine will report exception.
----
2019-04-09 02:10:45 UTC - young: @Sijie Guo the pulsar-client, version is pulsar-client-api-2.3.0.jar.
----
2019-04-09 02:11:40 UTC - Sijie Guo: are you using maven or gradle to build the java program?
----
2019-04-09 02:14:31 UTC - young: both not yet, just a simple test.
----
2019-04-09 02:17:03 UTC - Sijie Guo: it seems that you are invoking the test from eclipse and the exception seems to show it failed to load some default configuration from async http client.
----
2019-04-09 02:17:19 UTC - Sijie Guo: I am not sure how your test is running in eclipse.
----
2019-04-09 02:17:29 UTC - Sijie Guo: can you try to use pulsar-client-original dependency?
----
2019-04-09 02:22:15 UTC - young: Running the test in eclipse  connect to my own machine that deploy the standalone pulsar is ok, but I package to a jar file and exec  the command" java -jar  MessageProducer.jar " by command line  on the server machine is failed.
----
2019-04-09 02:23:27 UTC - Sijie Guo: &gt; but I package to a jar file and exec  the command” java -jar  MessageProducer.jar ” by command line  on the server machine is failed.

how do you package the jar file?
----
2019-04-09 02:25:59 UTC - young: eclipse file menu ---export ---runnable jar file---Package required libraries into generated JAR...
----
2019-04-09 02:27:44 UTC - Sijie Guo: okay I am not sure how eclipse do that. but asynchttpclient has some resources files which should be packaged into the JAR, otherwise the asynchttpclient will fail to load those resources files.
----
2019-04-09 02:33:44 UTC - young: oh, and then,any normal style of deploying java code to server machine for your recommendations?
----
2019-04-09 02:36:30 UTC - Sijie Guo: can you try using pulsar-client-original?
----
2019-04-09 02:41:01 UTC - young: thanks, I will try it.
----
2019-04-09 07:02:32 UTC - Mark Marijnissen: Thanks for the info. :slightly_smiling_face:
----
2019-04-09 07:04:56 UTC - Mark Marijnissen: Do Pulsar clients need to be able to connect to a specific broker, or will any broker do? The docs say they are stateless, so I expect any broker to work.
(Follow up question: If any broker will do, why do is the Pulsar proxy enabled in a Kubernetes deployment?)

(Context: I want to use nodejs to send messages. The node client with the c++ lib doesn't compile, so I want to use the websocket API as it's a low-volume use case. But the websocket API is not supported on the proxy)
----
2019-04-09 07:10:59 UTC - Ali Ahmed: @Mark Marijnissen connecting to any broker is fine the proxy is a transparent tcp proxy it’s there for convenience in certain kind of deployments.
nodejs should start to stabilize it’s in in early stages of developmet, you can open issues if you are unable to compile.
the websocket is supported by the proxy if it’s having problems please open a issue in github.
----
2019-04-09 07:23:55 UTC - Matteo Merli: &gt; Do Pulsar clients need to be able to connect to a specific broker, or will any broker do? The docs say they are stateless, so I expect any broker to work.

A client can connect initially to any broker, though it needs to be able to connect directly to all the brokers.

If direct connection is not possible (or desirable), then the proxy is a possible solution.
----
2019-04-09 08:50:33 UTC - Mark Marijnissen: A, that is useful to know.

In a kubernetes deployment, will the client receive an IP address of a pod? Or will it receive the service-IP adress, thus obscuring the actual pod (i.e. broker) that is there?
----