You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Byron Ellis via user <us...@beam.apache.org> on 2022/10/06 20:59:27 UTC

Re: PulsarIO not connecting

Hi Phani,

I believe what you want to do here is construct the PulsarClient object
within an implementation of SerializableFunction so that it can be executed
remotely rather than constructing the client in your main function. That
will mean ensuring that you have access to your certificate files from
those remote resources as well.

Best,
B

On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bv...@gmail.com> wrote:

> Hi Team,
>
>
>
> I am using native PulsarIO to connect existing pulsar server with ssl
> certification. But while adding withPulsarClient in pipeline I am getting
> cast error.
>
>
>
> More details and code is posted in below stackoverflow link
>
>
> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk
>
>
>
>
>
>
>
> Regards,
>
>
>
> Phani Geeth
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>

Re: PulsarIO not connecting

Posted by Byron Ellis via user <us...@beam.apache.org>.
Adding the Beam user list in case other folks might run into similar issues.

Hi Phani,

PulsarMessage's getMessageRecord() returns the message from the Pulsar
client itself. I haven't really used Pulsar myself but it looks like under
normal circumstances the Pulsar client would handle the translation from a
byte array to a Java object for you such as this
<https://github.com/streamnative/examples/blob/master/clients/schema/src/main/java/io/streamnative/examples/schema/avro/AvroSchemaConsumerExample.java>
example parsing Avro messages. Looking at the current Beam implementation
however, it would appear that the Pulsar client is configured to simply
take messages as a raw byte array and not use Pulsar's built-in
translation. This would mean that in your code you would probably want
something along the lines of
c.element().getMessageRecord().getMessage().getData() to get at the raw
bytes which you would then have to parse according to however they were
encoded.

Best,
B


On Mon, Jan 23, 2023 at 8:56 AM phani geeth <bv...@gmail.com> wrote:

> Hi Byron,
>
> Hope you are doing good.
>
> Need your help in Apache Beam PulsarIO again, finally able to read
> messages from pulsar server and able to pass to the next transform but not
> able to see the message it getting displayed as some encoded format not
> sure how to retrieve string message from that.
>
>
> Custom transform
> class MessagePrint extends DoFn<PulsarMessage, String>{
>     @ProcessElement
>     public void processElement(ProcessContext c)throws IOException{
>         System.out.println("printing message");
>         System.out.println(c.element().getMessageRecord());
>     }
> }
> Pipeline to read messages from pulsar topic
> PCollection<PulsarMessage> records=p.apply("read from pulsar", PulsarIO.
> read().withTopic(topic_name)
>         .withPublishTime().withClientUrl(client_url).withAdminUrl
> (admin_url));
>
>         records.apply("print message",ParDo.of(new MessagePrint()));
> c.element().getMessageRecord() is returning as "
> org.apache.pulsar.client.impl.MessageImpl@6cb47e3"
>
> Any help on this would be highly appreciated.
>
>
>
> Thanks for all the help so far.
> Regards,
> Phani Geeth
>
>
> On Mon, Oct 10, 2022 at 10:39 PM Phani Geeth <bv...@gmail.com>
> wrote:
>
>> Hi Byron,
>>
>>
>>
>> I am running the pipeline in eclipse workspace able to connect to
>> PulsarIO topic and getting message received status, but message is not able
>> to move to next PTransform. My use case is to read from Pulsar and write to
>> PubSub.
>>
>> Any help on this would be highly appreciated.
>>
>>
>>
>> Thanks for all the help so far.
>>
>>
>>
>> Regards,
>>
>> Phani Geeth
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *Byron Ellis <by...@google.com>
>> *Sent: *07 October 2022 19:38
>> *To: *phani geeth <bv...@gmail.com>
>> *Cc: *user@beam.apache.org
>> *Subject: *Re: PulsarIO not connecting
>>
>>
>>
>> There could be any number of reasons for that, it's hard to say without
>> knowing how you're running the pipeline. There's a pretty good chance the
>> message is indeed being printed, but not on a console you can see easily.
>> Personally for this kind of testing I tend to use metrics to get quick
>> feedback rather than logging output (if you do log I'd recommend switching
>> to a logging library rather than using println for a variety of reasons)
>>
>>
>>
>> Best,
>>
>> B
>>
>>
>>
>> On Fri, Oct 7, 2022 at 2:18 AM phani geeth <bv...@gmail.com>
>> wrote:
>>
>> Thanks Byron for the quick response it worked by creating Serializable
>> Function and generating client inside it.
>>
>>
>>
>> One more help is how to display pulsar message while adding DoFn in
>> pipeline it's not printing messages, able to see pipeline is recieving
>> messages but not able to display.
>>
>>
>>
>> Class MessagePrint extends DoFn<PulsarMessage,String>{
>>
>> @ProcessElement
>>
>> public void processElement(ProcessContext c){
>>
>> System.out.println(c.element);
>>
>> System.out.println(c.element().getMessageRecord());
>>
>> }}
>>
>>
>>
>>
>>
>> Added this DoFn in pipeline after PulsarIO but not able to print any
>> message.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Phani Geeth
>>
>>
>>
>> On Fri, 7 Oct, 2022, 2:29 am Byron Ellis via user, <us...@beam.apache.org>
>> wrote:
>>
>> Hi Phani,
>>
>>
>>
>> I believe what you want to do here is construct the PulsarClient object
>> within an implementation of SerializableFunction so that it can be executed
>> remotely rather than constructing the client in your main function. That
>> will mean ensuring that you have access to your certificate files from
>> those remote resources as well.
>>
>>
>>
>> Best,
>>
>> B
>>
>>
>>
>> On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bv...@gmail.com>
>> wrote:
>>
>> Hi Team,
>>
>>
>>
>> I am using native PulsarIO to connect existing pulsar server with ssl
>> certification. But while adding withPulsarClient in pipeline I am getting
>> cast error.
>>
>>
>>
>> More details and code is posted in below stackoverflow link
>>
>>
>> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>>
>>
>> Phani Geeth
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>>
>>
>

Re: PulsarIO not connecting

Posted by Byron Ellis via user <us...@beam.apache.org>.
There could be any number of reasons for that, it's hard to say without
knowing how you're running the pipeline. There's a pretty good chance the
message is indeed being printed, but not on a console you can see easily.
Personally for this kind of testing I tend to use metrics to get quick
feedback rather than logging output (if you do log I'd recommend switching
to a logging library rather than using println for a variety of reasons)

Best,
B

On Fri, Oct 7, 2022 at 2:18 AM phani geeth <bv...@gmail.com> wrote:

> Thanks Byron for the quick response it worked by creating Serializable
> Function and generating client inside it.
>
> One more help is how to display pulsar message while adding DoFn in
> pipeline it's not printing messages, able to see pipeline is recieving
> messages but not able to display.
>
> Class MessagePrint extends DoFn<PulsarMessage,String>{
> @ProcessElement
> public void processElement(ProcessContext c){
> System.out.println(c.element);
> System.out.println(c.element().getMessageRecord());
> }}
>
>
> Added this DoFn in pipeline after PulsarIO but not able to print any
> message.
>
>
> Regards,
> Phani Geeth
>
> On Fri, 7 Oct, 2022, 2:29 am Byron Ellis via user, <us...@beam.apache.org>
> wrote:
>
>> Hi Phani,
>>
>> I believe what you want to do here is construct the PulsarClient object
>> within an implementation of SerializableFunction so that it can be executed
>> remotely rather than constructing the client in your main function. That
>> will mean ensuring that you have access to your certificate files from
>> those remote resources as well.
>>
>> Best,
>> B
>>
>> On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bv...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>>
>>>
>>> I am using native PulsarIO to connect existing pulsar server with ssl
>>> certification. But while adding withPulsarClient in pipeline I am getting
>>> cast error.
>>>
>>>
>>>
>>> More details and code is posted in below stackoverflow link
>>>
>>>
>>> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>> Phani Geeth
>>>
>>>
>>>
>>>
>>>
>>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>>> Windows
>>>
>>>
>>>
>>

Re: PulsarIO not connecting

Posted by phani geeth <bv...@gmail.com>.
Thanks Byron for the quick response it worked by creating Serializable
Function and generating client inside it.

One more help is how to display pulsar message while adding DoFn in
pipeline it's not printing messages, able to see pipeline is recieving
messages but not able to display.

Class MessagePrint extends DoFn<PulsarMessage,String>{
@ProcessElement
public void processElement(ProcessContext c){
System.out.println(c.element);
System.out.println(c.element().getMessageRecord());
}}


Added this DoFn in pipeline after PulsarIO but not able to print any
message.


Regards,
Phani Geeth

On Fri, 7 Oct, 2022, 2:29 am Byron Ellis via user, <us...@beam.apache.org>
wrote:

> Hi Phani,
>
> I believe what you want to do here is construct the PulsarClient object
> within an implementation of SerializableFunction so that it can be executed
> remotely rather than constructing the client in your main function. That
> will mean ensuring that you have access to your certificate files from
> those remote resources as well.
>
> Best,
> B
>
> On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bv...@gmail.com> wrote:
>
>> Hi Team,
>>
>>
>>
>> I am using native PulsarIO to connect existing pulsar server with ssl
>> certification. But while adding withPulsarClient in pipeline I am getting
>> cast error.
>>
>>
>>
>> More details and code is posted in below stackoverflow link
>>
>>
>> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>>
>>
>> Phani Geeth
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>