You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/11/11 09:11:02 UTC

Slack digest for #general - 2019-11-11

2019-11-10 11:14:57 UTC - sundar: Hello everyone, I am trying to set up a pulsar cluster with 2 nodes. I installed pulsar following the steps given at <https://pulsar.apache.org/docs/en/deploy-bare-metal/> and also saw a video regarding the same at  <https://www.youtube.com/watch?v=oVfMm9sLjLs> ... till zookeeper everything works well, but when i tried to start bookie and checked the logs, it was empty. I tried using jps but there was no book keeper process, so I think book keeper is not starting properly. I am a beginner so i am not able to figure this out on my own. Can someone help me with this? Thank you
----
2019-11-10 13:53:26 UTC - Viji: We are porting our application from Kafka + Flink to Pulsar + Flink using PulsarSourceBuilder and FlinkPulsarProducer. Is it mandatory to set subscriptionInitialPosition ? If not set , will the consumption happen from last consumed position based on checkpoint in Flink ?
----
2019-11-10 13:58:28 UTC - Viji: Any specific option needs to be set to achieve new partition discovery, similar to flink.partition-discovery.interval-millis in the case of Kafka ?
----
2019-11-10 14:20:49 UTC - Matteo Merli: The default is that if the subscription does not exist, it will be created at the end of the topic, otherwise it will restart where it left off
----
2019-11-10 14:21:05 UTC - Viji: setAutoUpdatePartitions(true) in ProducerConfigurationData   can be used for partition discovery ?
----
2019-11-10 14:23:17 UTC - Viji: ok thanks
----
2019-11-11 02:04:20 UTC - Sijie Guo: can you create a github issue with your steps? so we can see how we can help you.
----
2019-11-11 02:04:56 UTC - Sijie Guo: yes
----
2019-11-11 02:10:14 UTC - vincent: @vincent has joined the channel
----
2019-11-11 07:24:53 UTC - Jasper Li: @Sijie Guo Hello, I have defined a serde (I don't know if it is correct or not):
```
from distutils.util import strtobool
from pulsar import SerDe
from pulsar.schema import (
    Record, String, Integer, Boolean
)


class Message(Record):
    a = String()
    b = Integer()
    c = Boolean()

    def __init__(self, a, b, c):
        self.a = a
        self.b = int(b)
        self.c = bool(strtobool(c))


class MessageSerde(SerDe):
    def __init__(self, message):
        self.message = message

    def serialize(self, input):
        return bytes("{0},{1},{2}".format(
            self.message.a, self.message.b, self.message.c))

    def deserialize(self, input_bytes):
        components = str(input_bytes).split(",")
        return Message(*components)

```

and I have executed `pulsar-admin functions` command in several ways but all of them told me they cannot find my serde,
```
[INFO] util.py: Failed to import class serde.MessageSerde from path /pulsar
[INFO] util.py: 'module' object has no attribute 'MessageSerde'
Traceback (most recent call last): File "/pulsar/instances/python-instance/util.py", line 46, in import_class return import_class_from_path(api_dir, full_class_name) File "/pulsar/instances/python-instance/util.py", line 65, in import_class_from_path retval = getattr(mod, class_name) AttributeError: 'module' object has no attribute 'MessageSerde'
```

Where should I put the serde into? Or what command should I pass to pulsar-functions for the argument of `custom-serde-inputs`?

Thanks again!!!
----
2019-11-11 07:32:20 UTC - Pradeep Mishra: Hi Everyone, i want to replace kafka with pulsar for log streaming, right now i am using kafka + fluentd + elastic search for log processing. does anybody know the fluentd connector for Pulsar ?
----
2019-11-11 08:09:13 UTC - Naveen Kumar: We are planning to use AvroConnector for Debezium CDC instead of JsonConverter. Downloaded the kafka-connect-avro-converter-5.3.1.jar into $APACHE_PULSAR/lib directory. Yet still the connector jobs fails with "java.lang.ClassNotFoundException: io.confluent.connect.avro.AvroConverter". Are we missing something?
----
2019-11-11 08:10:39 UTC - tuteng: Pulsar already supports beats of elastic and flume, but there is no support for fluentd, but I think you can create an issue on <https://github.com/apache/pulsar>, and later we can implement this connector with ruby-based clients <https://github.com/hiroakiwater/ruby-pulsar-client>.
----
2019-11-11 08:12:28 UTC - tuteng: Can JsonConverter be used normally?
----
2019-11-11 08:12:41 UTC - Pradeep Mishra: sure, thanks
----
2019-11-11 08:20:54 UTC - Sijie Guo: @tuteng Can you help with the question?
----
2019-11-11 08:21:32 UTC - Naveen Kumar: Yes. No need to include any library for JsonConverter.
----
2019-11-11 08:24:18 UTC - tuteng: Can you show me the commands you use? @Jasper Li
----
2019-11-11 08:26:34 UTC - tuteng: How do you run the function, can you show me your command?
----
2019-11-11 08:28:35 UTC - leonidv: Hi all! By default, when I create subscription I read only messages which were added to the topic after subscription creation. I can use consumer.seek(MessageId.earliest) to read all messages by topic. But I don't understand, how to read all the messages only one time? In other words, I want to read all the messages in the topic and after that receive only new messages.
----
2019-11-11 08:30:03 UTC - Jasper Li: @tuteng
```bin/pulsar-admin functions create --tenant public --namespace default --name hellopulsar --py functions/function.py --classname function.PulsarFunction --custom-serde-inputs '{"public/default/topic":"function.MessageSerde"}' --output public/default/outTopic --log-topic public/default/logTopic --cpu 0.1```

Thanks!
----
2019-11-11 08:32:10 UTC - leonidv: I'm worried about service restart case:
1. Run new service (microservice) which create new subscription and should process all already exists messages in topic
2. Restart service
3. Service continue receive unprocessed messages (messages without ack).
----
2019-11-11 08:32:43 UTC - tuteng: Can you show me your function.py file? I'd like to see some .py files you imported.
----
2019-11-11 08:35:37 UTC - Jasper Li: @tuteng Of coz.
```
from pulsar import Function
from distutils.util import strtobool
from pulsar import SerDe
from pulsar.schema import (
    Record, String, Integer, Boolean
)


class Message(Record):
    a = String()
    b = Integer()
    c = Boolean()

    def __init__(self, a, b, c):
        self.a = a
        self.b = int(b)
        self.c = bool(strtobool(c))


class MessageSerde(SerDe):
    def __init__(self, message):
        self.message = message

    def serialize(self, input):
        return bytes("{0},{1},{2}".format(
            self.message.a, self.message.b, self.message.c))

    def deserialize(self, input_bytes):
        components = str(input_bytes).split(",")
        return Message(*components)


class PulsarFunction(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        logger = context.get_logger()
        <http://logger.info|logger.info>("----- got message -----")
        <http://logger.info|logger.info>("%s" % ", ".join(input.a, input.b, input.c))
        return "/".join(input.a, input.b, input.c)
```
----
2019-11-11 08:44:49 UTC - Jasper Li: @Naveen Kumar In my experience, you need to  import dependencies of io.confluent in the pom.xml and build a new .nar file yourself.
----
2019-11-11 09:06:30 UTC - Sijie Guo: by  default a new subscription starts from the latest message, hence you only receive the messages produced after creating the new subscription.

You create specify SubscriptionInitialPostion to `earliest` to receive all the messages.
----
2019-11-11 09:07:54 UTC - leonidv: If I make code like:
consumer.seek(earliest)
consumer.subscribe

It will be reread all messages on every microservice start?
----
2019-11-11 09:10:52 UTC - Sijie Guo: ```
consumer.subscribe 
consumer.seek(earliest)
```

however this will rewind every time your consumer connects. that’s probably not what you want.

you can subscribe with `subscriptionInitialPosition(SubscriptionInitialPosition.earliest)`
----