You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by ayush sharma <17...@gmail.com> on 2020/07/17 15:51:22 UTC

ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Hi,

I am trying to build a streaming beam pipeline in python which should
capture messages from kafka and then execute further stages of data
fetching from other sources and aggregation. The step-by-step process of
what I have built till now is:

   1.

   Running Kafka instance on localhost:9092

   ./bin/kafka-server-start.sh ./config/server.properties
   2.

   Run beam-flink job server using docker

   docker run --net=host apache/beam_flink1.10_job_server:latest
   3.

   Run beam-kafka pipeline

import apache_beam as beamfrom apache_beam.io.external.kafka import
ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
import PipelineOptions, StandardOptions

if __name__ == '__main__':
    options = PipelineOptions([
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK",
        "--streaming",
        "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
    ])

    options = options.view_as(StandardOptions)
    options.streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | beam.Map(print)
    )

    pipeline.run()


   1. Publish new message using kafka-producer.sh

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mytopic>tryme

After publishing this trial message, the beam pipeline perceives the
message but crashes giving this error:

RuntimeError: org.apache.beam.sdk.util.UserCodeException:
org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
    at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
    at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
    at org.apache.beam

Regards,

Ayush Sharma.

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
I tried enabling auto commit using flink and direct runners individually
using the code below.
Irrespective of environment-type ["DOCKER", "LOOPBACK"] for flink runner,
the pipeline crashes with ERROR: java.lang.UnsupportedOperationException:
The ActiveBundle does not have a registered bundle checkpoint handler.
And direct runner (no args in PipelineOptions()) does not respond to the
reception of the published kafka message.

# USAGE:
# 1. python sof_kafka_read_v2.py --runner flink
# 2. python sof_kafka_read_v2.py --runner flink --environment-type DOCKER
# 3. python sof_kafka_read_v2.py

import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions, SetupOptions
import argparse

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--runner',
        type=str,
        default="direct",
    )
    parser.add_argument(
        '--environment-type',
        type=str,
        default="LOOPBACK",
    )

    args = parser.parse_args()

    runner = args.runner
    environment_type = args.environment_type

    if runner == "direct":
        options = PipelineOptions()
    elif runner == "flink":
        options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_master=localhost:8081",
        "--environment_type=" + environment_type,
        "--job_name=try_kafka",
        "--streaming",
    ])

    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
                "enable.auto.commit": 'true',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | "print" >> beam.Map(print)
    )

    res = pipeline.run()
    res.wait_until_finish()



On Wed, Jul 22, 2020 at 7:39 PM Robert Bradshaw <ro...@google.com> wrote:

> On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today.
>>
>
> When running a Streaming pipeline, the DirectRuner falls back to the old
> runner that does not support cross-language.
> https://issues.apache.org/jira/browse/BEAM-7514
>
> Please note that cross-language transforms framework is fairly new [1] and
>> we are adding support for various runners and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>> <ch...@google.com>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
I tried enabling auto commit using flink and direct runners individually
using the code below.
Irrespective of environment-type ["DOCKER", "LOOPBACK"] for flink runner,
the pipeline crashes with ERROR: java.lang.UnsupportedOperationException:
The ActiveBundle does not have a registered bundle checkpoint handler.
And direct runner (no args in PipelineOptions()) does not respond to the
reception of the published kafka message.

# USAGE:
# 1. python sof_kafka_read_v2.py --runner flink
# 2. python sof_kafka_read_v2.py --runner flink --environment-type DOCKER
# 3. python sof_kafka_read_v2.py

import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions, SetupOptions
import argparse

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--runner',
        type=str,
        default="direct",
    )
    parser.add_argument(
        '--environment-type',
        type=str,
        default="LOOPBACK",
    )

    args = parser.parse_args()

    runner = args.runner
    environment_type = args.environment_type

    if runner == "direct":
        options = PipelineOptions()
    elif runner == "flink":
        options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_master=localhost:8081",
        "--environment_type=" + environment_type,
        "--job_name=try_kafka",
        "--streaming",
    ])

    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
                "enable.auto.commit": 'true',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | "print" >> beam.Map(print)
    )

    res = pipeline.run()
    res.wait_until_finish()



On Wed, Jul 22, 2020 at 7:39 PM Robert Bradshaw <ro...@google.com> wrote:

> On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today.
>>
>
> When running a Streaming pipeline, the DirectRuner falls back to the old
> runner that does not support cross-language.
> https://issues.apache.org/jira/browse/BEAM-7514
>
> Please note that cross-language transforms framework is fairly new [1] and
>> we are adding support for various runners and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>> <ch...@google.com>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Robert Bradshaw <ro...@google.com>.
On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>
>>         | "print" >> beam.Map(print)
>>         )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today.
>

When running a Streaming pipeline, the DirectRuner falls back to the old
runner that does not support cross-language.
https://issues.apache.org/jira/browse/BEAM-7514

Please note that cross-language transforms framework is fairly new [1] and
> we are adding support for various runners and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>> <ch...@google.com>
>>>>>
>>>>> Do you know if your trial record has an empty key or value?
>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>> supporting this usecase.
>>>>>
>>>>> Heejong and Cham,
>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>
>>>>> 1:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 2:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 3:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>> 4:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>>> capture messages from kafka and then execute further stages of data
>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>> what I have built till now is:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Running Kafka instance on localhost:9092
>>>>>>
>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>    2.
>>>>>>
>>>>>>    Run beam-flink job server using docker
>>>>>>
>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>    3.
>>>>>>
>>>>>>    Run beam-kafka pipeline
>>>>>>
>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>>     options = PipelineOptions([
>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>         "--environment_type=LOOPBACK",
>>>>>>         "--streaming",
>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>     ])
>>>>>>
>>>>>>     options = options.view_as(StandardOptions)
>>>>>>     options.streaming = True
>>>>>>
>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>
>>>>>>     result = (
>>>>>>         pipeline
>>>>>>
>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>             },
>>>>>>             topics=['mytopic'],
>>>>>>             expansion_service='localhost:8097',
>>>>>>         )
>>>>>>
>>>>>>         | beam.Map(print)
>>>>>>     )
>>>>>>
>>>>>>     pipeline.run()
>>>>>>
>>>>>>
>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>
>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>
>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>> message but crashes giving this error:
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>     at org.apache.beam
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ayush Sharma.
>>>>>>
>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
Yeah, this is a known issue. According to +Boyuan Zhang <bo...@google.com>'s
comment in the bug you should still be able to read as long as Kafka
cluster is set up to auto-commit though and these errors can be safely
ignored. For example, you can set "enable.auto.commit" to "true" in the
consumer config passed to ReadFromKafka.
I haven't tried this myself though so please comment in the JIRA if this is
a true blocker for you for reading from Kafka which will help us to
identify the true priority of this JIRA.

Thanks,
Cham

On Mon, Jul 20, 2020 at 2:40 PM ayush sharma <17...@gmail.com> wrote:

> Is there any workaround to this issue?
>
> On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you for the suggestions. I tried using FlinkRunner and
>> setting environment_type either to DOCKER or LOOPBACK gives an error -
>> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
>> registered bundle checkpoint handler.
>>
>> I found that this issue has been reported (
>> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
>> Thank you for the prompt responses and looking forward to using this
>> feature in the future.
>>
>> Regards,
>> Ayush.
>>
>> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>>> wrote:
>>>
>>>> Thank you guys for the reply. I am really stuck and could not proceed
>>>> further.
>>>> Yes, the previous trial published message had null key.
>>>> But when I send key:value pair through producer using
>>>>
>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>>> > tryKey:tryValue
>>>>
>>>> I do not get any error but beam does not print the received message.
>>>> Here is how my pipeline looks like,
>>>> result = (
>>>>         pipeline
>>>>
>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>             },
>>>>             topics=['mytopic'],
>>>>             expansion_service='localhost:8097',
>>>>
>>>>         | "print" >> beam.Map(print)
>>>>         )
>>>>
>>>>
>>> I suspect DirectRunner in LOOPBACK mode might not be working for
>>> cross-language transforms today. Please note that cross-language transforms
>>> framework is fairly new [1] and we are adding support for various runners
>>> and environment configurations.
>>> Can you try with Flink in DOCKER mode ?
>>>
>>>
>>>> If this is not the way we make beam and kafka communicate then please
>>>> share a working example which showcases how a message published in kafka
>>>> gets received by beam while streaming.
>>>>
>>>
>>> I'm adding an example but I've only tested this with Dataflow yet. I
>>> hope to test that example for more runners and add additional instructions
>>> there.
>>> https://github.com/apache/beam/pull/12188
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>>
>>>>
>>>> Regards,
>>>> Ayush Sharma
>>>>
>>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has
>>>>> to be updated to support this. You should not run into this error if you
>>>>> publish keys and values that are not null.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +dev <de...@beam.apache.org>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>>> <ch...@google.com>
>>>>>>>
>>>>>>> Do you know if your trial record has an empty key or value?
>>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>>> supporting this usecase.
>>>>>>>
>>>>>>> Heejong and Cham,
>>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>>
>>>>>>> 1:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 2:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 3:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>>> 4:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>>> what I have built till now is:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>>
>>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    Run beam-flink job server using docker
>>>>>>>>
>>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Run beam-kafka pipeline
>>>>>>>>
>>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>>
>>>>>>>> if __name__ == '__main__':
>>>>>>>>     options = PipelineOptions([
>>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>>         "--streaming",
>>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>>     ])
>>>>>>>>
>>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>>     options.streaming = True
>>>>>>>>
>>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>>
>>>>>>>>     result = (
>>>>>>>>         pipeline
>>>>>>>>
>>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>>             consumer_config={
>>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>>             },
>>>>>>>>             topics=['mytopic'],
>>>>>>>>             expansion_service='localhost:8097',
>>>>>>>>         )
>>>>>>>>
>>>>>>>>         | beam.Map(print)
>>>>>>>>     )
>>>>>>>>
>>>>>>>>     pipeline.run()
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>>
>>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>>
>>>>>>>> After publishing this trial message, the beam pipeline perceives
>>>>>>>> the message but crashes giving this error:
>>>>>>>>
>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>>     at org.apache.beam
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Ayush Sharma.
>>>>>>>>
>>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
Yeah, this is a known issue. According to +Boyuan Zhang <bo...@google.com>'s
comment in the bug you should still be able to read as long as Kafka
cluster is set up to auto-commit though and these errors can be safely
ignored. For example, you can set "enable.auto.commit" to "true" in the
consumer config passed to ReadFromKafka.
I haven't tried this myself though so please comment in the JIRA if this is
a true blocker for you for reading from Kafka which will help us to
identify the true priority of this JIRA.

Thanks,
Cham

On Mon, Jul 20, 2020 at 2:40 PM ayush sharma <17...@gmail.com> wrote:

> Is there any workaround to this issue?
>
> On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you for the suggestions. I tried using FlinkRunner and
>> setting environment_type either to DOCKER or LOOPBACK gives an error -
>> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
>> registered bundle checkpoint handler.
>>
>> I found that this issue has been reported (
>> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
>> Thank you for the prompt responses and looking forward to using this
>> feature in the future.
>>
>> Regards,
>> Ayush.
>>
>> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>>> wrote:
>>>
>>>> Thank you guys for the reply. I am really stuck and could not proceed
>>>> further.
>>>> Yes, the previous trial published message had null key.
>>>> But when I send key:value pair through producer using
>>>>
>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>>> > tryKey:tryValue
>>>>
>>>> I do not get any error but beam does not print the received message.
>>>> Here is how my pipeline looks like,
>>>> result = (
>>>>         pipeline
>>>>
>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>             },
>>>>             topics=['mytopic'],
>>>>             expansion_service='localhost:8097',
>>>>
>>>>         | "print" >> beam.Map(print)
>>>>         )
>>>>
>>>>
>>> I suspect DirectRunner in LOOPBACK mode might not be working for
>>> cross-language transforms today. Please note that cross-language transforms
>>> framework is fairly new [1] and we are adding support for various runners
>>> and environment configurations.
>>> Can you try with Flink in DOCKER mode ?
>>>
>>>
>>>> If this is not the way we make beam and kafka communicate then please
>>>> share a working example which showcases how a message published in kafka
>>>> gets received by beam while streaming.
>>>>
>>>
>>> I'm adding an example but I've only tested this with Dataflow yet. I
>>> hope to test that example for more runners and add additional instructions
>>> there.
>>> https://github.com/apache/beam/pull/12188
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>>
>>>>
>>>> Regards,
>>>> Ayush Sharma
>>>>
>>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has
>>>>> to be updated to support this. You should not run into this error if you
>>>>> publish keys and values that are not null.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +dev <de...@beam.apache.org>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>>> <ch...@google.com>
>>>>>>>
>>>>>>> Do you know if your trial record has an empty key or value?
>>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>>> supporting this usecase.
>>>>>>>
>>>>>>> Heejong and Cham,
>>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>>
>>>>>>> 1:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 2:
>>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>>> 3:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>>> 4:
>>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>>> what I have built till now is:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>>
>>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    Run beam-flink job server using docker
>>>>>>>>
>>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Run beam-kafka pipeline
>>>>>>>>
>>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>>
>>>>>>>> if __name__ == '__main__':
>>>>>>>>     options = PipelineOptions([
>>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>>         "--streaming",
>>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>>     ])
>>>>>>>>
>>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>>     options.streaming = True
>>>>>>>>
>>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>>
>>>>>>>>     result = (
>>>>>>>>         pipeline
>>>>>>>>
>>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>>             consumer_config={
>>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>>             },
>>>>>>>>             topics=['mytopic'],
>>>>>>>>             expansion_service='localhost:8097',
>>>>>>>>         )
>>>>>>>>
>>>>>>>>         | beam.Map(print)
>>>>>>>>     )
>>>>>>>>
>>>>>>>>     pipeline.run()
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>>
>>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>>
>>>>>>>> After publishing this trial message, the beam pipeline perceives
>>>>>>>> the message but crashes giving this error:
>>>>>>>>
>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>>     at org.apache.beam
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Ayush Sharma.
>>>>>>>>
>>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
Is there any workaround to this issue?

On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <17...@gmail.com> wrote:

> Thank you for the suggestions. I tried using FlinkRunner and
> setting environment_type either to DOCKER or LOOPBACK gives an error -
> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
> registered bundle checkpoint handler.
>
> I found that this issue has been reported (
> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
> Thank you for the prompt responses and looking forward to using this
> feature in the future.
>
> Regards,
> Ayush.
>
> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today. Please note that cross-language transforms
>> framework is fairly new [1] and we are adding support for various runners
>> and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>> <ch...@google.com>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
Is there any workaround to this issue?

On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <17...@gmail.com> wrote:

> Thank you for the suggestions. I tried using FlinkRunner and
> setting environment_type either to DOCKER or LOOPBACK gives an error -
> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
> registered bundle checkpoint handler.
>
> I found that this issue has been reported (
> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
> Thank you for the prompt responses and looking forward to using this
> feature in the future.
>
> Regards,
> Ayush.
>
> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today. Please note that cross-language transforms
>> framework is fairly new [1] and we are adding support for various runners
>> and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +dev <de...@beam.apache.org>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>>> <ch...@google.com>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
Thank you for the suggestions. I tried using FlinkRunner and
setting environment_type either to DOCKER or LOOPBACK gives an error -
java.lang.UnsupportedOperationException: The ActiveBundle does not have a
registered bundle checkpoint handler.

I found that this issue has been reported (
https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
Thank you for the prompt responses and looking forward to using this
feature in the future.

Regards,
Ayush.

On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>
>>         | "print" >> beam.Map(print)
>>         )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today. Please note that cross-language transforms
> framework is fairly new [1] and we are adding support for various runners
> and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>> <ch...@google.com>
>>>>>
>>>>> Do you know if your trial record has an empty key or value?
>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>> supporting this usecase.
>>>>>
>>>>> Heejong and Cham,
>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>
>>>>> 1:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 2:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 3:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>> 4:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>>> capture messages from kafka and then execute further stages of data
>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>> what I have built till now is:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Running Kafka instance on localhost:9092
>>>>>>
>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>    2.
>>>>>>
>>>>>>    Run beam-flink job server using docker
>>>>>>
>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>    3.
>>>>>>
>>>>>>    Run beam-kafka pipeline
>>>>>>
>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>>     options = PipelineOptions([
>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>         "--environment_type=LOOPBACK",
>>>>>>         "--streaming",
>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>     ])
>>>>>>
>>>>>>     options = options.view_as(StandardOptions)
>>>>>>     options.streaming = True
>>>>>>
>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>
>>>>>>     result = (
>>>>>>         pipeline
>>>>>>
>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>             },
>>>>>>             topics=['mytopic'],
>>>>>>             expansion_service='localhost:8097',
>>>>>>         )
>>>>>>
>>>>>>         | beam.Map(print)
>>>>>>     )
>>>>>>
>>>>>>     pipeline.run()
>>>>>>
>>>>>>
>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>
>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>
>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>> message but crashes giving this error:
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>     at org.apache.beam
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ayush Sharma.
>>>>>>
>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
Thank you for the suggestions. I tried using FlinkRunner and
setting environment_type either to DOCKER or LOOPBACK gives an error -
java.lang.UnsupportedOperationException: The ActiveBundle does not have a
registered bundle checkpoint handler.

I found that this issue has been reported (
https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
Thank you for the prompt responses and looking forward to using this
feature in the future.

Regards,
Ayush.

On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>
>>         | "print" >> beam.Map(print)
>>         )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today. Please note that cross-language transforms
> framework is fairly new [1] and we are adding support for various runners
> and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>> <ch...@google.com>
>>>>>
>>>>> Do you know if your trial record has an empty key or value?
>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>> supporting this usecase.
>>>>>
>>>>> Heejong and Cham,
>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>
>>>>> 1:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 2:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 3:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>> 4:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>>> capture messages from kafka and then execute further stages of data
>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>> what I have built till now is:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Running Kafka instance on localhost:9092
>>>>>>
>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>    2.
>>>>>>
>>>>>>    Run beam-flink job server using docker
>>>>>>
>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>    3.
>>>>>>
>>>>>>    Run beam-kafka pipeline
>>>>>>
>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>>     options = PipelineOptions([
>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>         "--environment_type=LOOPBACK",
>>>>>>         "--streaming",
>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>     ])
>>>>>>
>>>>>>     options = options.view_as(StandardOptions)
>>>>>>     options.streaming = True
>>>>>>
>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>
>>>>>>     result = (
>>>>>>         pipeline
>>>>>>
>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>             },
>>>>>>             topics=['mytopic'],
>>>>>>             expansion_service='localhost:8097',
>>>>>>         )
>>>>>>
>>>>>>         | beam.Map(print)
>>>>>>     )
>>>>>>
>>>>>>     pipeline.run()
>>>>>>
>>>>>>
>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>
>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>
>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>> message but crashes giving this error:
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>     at org.apache.beam
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ayush Sharma.
>>>>>>
>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Robert Bradshaw <ro...@google.com>.
On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>
>>         | "print" >> beam.Map(print)
>>         )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today.
>

When running a Streaming pipeline, the DirectRuner falls back to the old
runner that does not support cross-language.
https://issues.apache.org/jira/browse/BEAM-7514

Please note that cross-language transforms framework is fairly new [1] and
> we are adding support for various runners and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>>> <ch...@google.com>
>>>>>
>>>>> Do you know if your trial record has an empty key or value?
>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>> supporting this usecase.
>>>>>
>>>>> Heejong and Cham,
>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>
>>>>> 1:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 2:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 3:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>> 4:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>>> capture messages from kafka and then execute further stages of data
>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>> what I have built till now is:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Running Kafka instance on localhost:9092
>>>>>>
>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>    2.
>>>>>>
>>>>>>    Run beam-flink job server using docker
>>>>>>
>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>    3.
>>>>>>
>>>>>>    Run beam-kafka pipeline
>>>>>>
>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>>     options = PipelineOptions([
>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>         "--environment_type=LOOPBACK",
>>>>>>         "--streaming",
>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>     ])
>>>>>>
>>>>>>     options = options.view_as(StandardOptions)
>>>>>>     options.streaming = True
>>>>>>
>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>
>>>>>>     result = (
>>>>>>         pipeline
>>>>>>
>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>             },
>>>>>>             topics=['mytopic'],
>>>>>>             expansion_service='localhost:8097',
>>>>>>         )
>>>>>>
>>>>>>         | beam.Map(print)
>>>>>>     )
>>>>>>
>>>>>>     pipeline.run()
>>>>>>
>>>>>>
>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>
>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>>
>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>> message but crashes giving this error:
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>     at org.apache.beam
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ayush Sharma.
>>>>>>
>>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:

> Thank you guys for the reply. I am really stuck and could not proceed
> further.
> Yes, the previous trial published message had null key.
> But when I send key:value pair through producer using
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> mytopic --property *"parse.key=true" --property "key.separator=:"*
> > tryKey:tryValue
>
> I do not get any error but beam does not print the received message. Here
> is how my pipeline looks like,
> result = (
>         pipeline
>
>         | "Read from kafka" >> ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": 'localhost:9092',
>             },
>             topics=['mytopic'],
>             expansion_service='localhost:8097',
>
>         | "print" >> beam.Map(print)
>         )
>
>
I suspect DirectRunner in LOOPBACK mode might not be working for
cross-language transforms today. Please note that cross-language transforms
framework is fairly new [1] and we are adding support for various runners
and environment configurations.
Can you try with Flink in DOCKER mode ?


> If this is not the way we make beam and kafka communicate then please
> share a working example which showcases how a message published in kafka
> gets received by beam while streaming.
>

I'm adding an example but I've only tested this with Dataflow yet. I hope
to test that example for more runners and add additional instructions
there.
https://github.com/apache/beam/pull/12188

Thanks,
Cham

[1] https://beam.apache.org/roadmap/connectors-multi-sdk/

>
> Regards,
> Ayush Sharma
>
> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>> be updated to support this. You should not run into this error if you
>> publish keys and values that are not null.
>>
>>
>>
>>
>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> +dev <de...@beam.apache.org>
>>>
>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>> <ch...@google.com>
>>>>
>>>> Do you know if your trial record has an empty key or value?
>>>> If so, then you hit a bug and it seems as though there was a miss
>>>> supporting this usecase.
>>>>
>>>> Heejong and Cham,
>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>> Deserializers can return null[1, 2] and we aren't using
>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>> logic[4]. I flied BEAM-10529[5]
>>>>
>>>> 1:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 2:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 3:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>> 4:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>> capture messages from kafka and then execute further stages of data
>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>> what I have built till now is:
>>>>>
>>>>>    1.
>>>>>
>>>>>    Running Kafka instance on localhost:9092
>>>>>
>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>    2.
>>>>>
>>>>>    Run beam-flink job server using docker
>>>>>
>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>    3.
>>>>>
>>>>>    Run beam-kafka pipeline
>>>>>
>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>
>>>>> if __name__ == '__main__':
>>>>>     options = PipelineOptions([
>>>>>         "--job_endpoint=localhost:8099",
>>>>>         "--environment_type=LOOPBACK",
>>>>>         "--streaming",
>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>     ])
>>>>>
>>>>>     options = options.view_as(StandardOptions)
>>>>>     options.streaming = True
>>>>>
>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>
>>>>>     result = (
>>>>>         pipeline
>>>>>
>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>             consumer_config={
>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>             },
>>>>>             topics=['mytopic'],
>>>>>             expansion_service='localhost:8097',
>>>>>         )
>>>>>
>>>>>         | beam.Map(print)
>>>>>     )
>>>>>
>>>>>     pipeline.run()
>>>>>
>>>>>
>>>>>    1. Publish new message using kafka-producer.sh
>>>>>
>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>
>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>> message but crashes giving this error:
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>     at org.apache.beam
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ayush Sharma.
>>>>>
>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <17...@gmail.com> wrote:

> Thank you guys for the reply. I am really stuck and could not proceed
> further.
> Yes, the previous trial published message had null key.
> But when I send key:value pair through producer using
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> mytopic --property *"parse.key=true" --property "key.separator=:"*
> > tryKey:tryValue
>
> I do not get any error but beam does not print the received message. Here
> is how my pipeline looks like,
> result = (
>         pipeline
>
>         | "Read from kafka" >> ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": 'localhost:9092',
>             },
>             topics=['mytopic'],
>             expansion_service='localhost:8097',
>
>         | "print" >> beam.Map(print)
>         )
>
>
I suspect DirectRunner in LOOPBACK mode might not be working for
cross-language transforms today. Please note that cross-language transforms
framework is fairly new [1] and we are adding support for various runners
and environment configurations.
Can you try with Flink in DOCKER mode ?


> If this is not the way we make beam and kafka communicate then please
> share a working example which showcases how a message published in kafka
> gets received by beam while streaming.
>

I'm adding an example but I've only tested this with Dataflow yet. I hope
to test that example for more runners and add additional instructions
there.
https://github.com/apache/beam/pull/12188

Thanks,
Cham

[1] https://beam.apache.org/roadmap/connectors-multi-sdk/

>
> Regards,
> Ayush Sharma
>
> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>> be updated to support this. You should not run into this error if you
>> publish keys and values that are not null.
>>
>>
>>
>>
>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> +dev <de...@beam.apache.org>
>>>
>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>>> <ch...@google.com>
>>>>
>>>> Do you know if your trial record has an empty key or value?
>>>> If so, then you hit a bug and it seems as though there was a miss
>>>> supporting this usecase.
>>>>
>>>> Heejong and Cham,
>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>> Deserializers can return null[1, 2] and we aren't using
>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>> logic[4]. I flied BEAM-10529[5]
>>>>
>>>> 1:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 2:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 3:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>> 4:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>> capture messages from kafka and then execute further stages of data
>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>> what I have built till now is:
>>>>>
>>>>>    1.
>>>>>
>>>>>    Running Kafka instance on localhost:9092
>>>>>
>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>    2.
>>>>>
>>>>>    Run beam-flink job server using docker
>>>>>
>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>    3.
>>>>>
>>>>>    Run beam-kafka pipeline
>>>>>
>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>>
>>>>> if __name__ == '__main__':
>>>>>     options = PipelineOptions([
>>>>>         "--job_endpoint=localhost:8099",
>>>>>         "--environment_type=LOOPBACK",
>>>>>         "--streaming",
>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>     ])
>>>>>
>>>>>     options = options.view_as(StandardOptions)
>>>>>     options.streaming = True
>>>>>
>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>
>>>>>     result = (
>>>>>         pipeline
>>>>>
>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>             consumer_config={
>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>             },
>>>>>             topics=['mytopic'],
>>>>>             expansion_service='localhost:8097',
>>>>>         )
>>>>>
>>>>>         | beam.Map(print)
>>>>>     )
>>>>>
>>>>>     pipeline.run()
>>>>>
>>>>>
>>>>>    1. Publish new message using kafka-producer.sh
>>>>>
>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>>
>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>> message but crashes giving this error:
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>     at org.apache.beam
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ayush Sharma.
>>>>>
>>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by ayush sharma <17...@gmail.com>.
Thank you guys for the reply. I am really stuck and could not proceed
further.
Yes, the previous trial published message had null key.
But when I send key:value pair through producer using

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mytopic --property *"parse.key=true" --property "key.separator=:"*
> tryKey:tryValue

I do not get any error but beam does not print the received message. Here
is how my pipeline looks like,
result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',

        | "print" >> beam.Map(print)
        )

If this is not the way we make beam and kafka communicate then please share
a working example which showcases how a message published in kafka gets
received by beam while streaming.

Regards,
Ayush Sharma

On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Yes, seems like this is due to the key being null. XLang KafkaIO has to be
> updated to support this. You should not run into this error if you publish
> keys and values that are not null.
>
>
>
>
> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>
>> +dev <de...@beam.apache.org>
>>
>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>>> <ch...@google.com>
>>>
>>> Do you know if your trial record has an empty key or value?
>>> If so, then you hit a bug and it seems as though there was a miss
>>> supporting this usecase.
>>>
>>> Heejong and Cham,
>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>> Deserializers can return null[1, 2] and we aren't using
>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>> non XLang KafkaIO does this correctly in its regular coder inference
>>> logic[4]. I flied BEAM-10529[5]
>>>
>>> 1:
>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>> 2:
>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>> 3:
>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>> 4:
>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to build a streaming beam pipeline in python which should
>>>> capture messages from kafka and then execute further stages of data
>>>> fetching from other sources and aggregation. The step-by-step process of
>>>> what I have built till now is:
>>>>
>>>>    1.
>>>>
>>>>    Running Kafka instance on localhost:9092
>>>>
>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>    2.
>>>>
>>>>    Run beam-flink job server using docker
>>>>
>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>    3.
>>>>
>>>>    Run beam-kafka pipeline
>>>>
>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>>
>>>> if __name__ == '__main__':
>>>>     options = PipelineOptions([
>>>>         "--job_endpoint=localhost:8099",
>>>>         "--environment_type=LOOPBACK",
>>>>         "--streaming",
>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>     ])
>>>>
>>>>     options = options.view_as(StandardOptions)
>>>>     options.streaming = True
>>>>
>>>>     pipeline = beam.Pipeline(options=options)
>>>>
>>>>     result = (
>>>>         pipeline
>>>>
>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>             },
>>>>             topics=['mytopic'],
>>>>             expansion_service='localhost:8097',
>>>>         )
>>>>
>>>>         | beam.Map(print)
>>>>     )
>>>>
>>>>     pipeline.run()
>>>>
>>>>
>>>>    1. Publish new message using kafka-producer.sh
>>>>
>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>>
>>>> After publishing this trial message, the beam pipeline perceives the
>>>> message but crashes giving this error:
>>>>
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>     at org.apache.beam
>>>>
>>>> Regards,
>>>>
>>>> Ayush Sharma.
>>>>
>>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
Yes, seems like this is due to the key being null. XLang KafkaIO has to be
updated to support this. You should not run into this error if you publish
keys and values that are not null.




On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:

> +dev <de...@beam.apache.org>
>
> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>
>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>> <ch...@google.com>
>>
>> Do you know if your trial record has an empty key or value?
>> If so, then you hit a bug and it seems as though there was a miss
>> supporting this usecase.
>>
>> Heejong and Cham,
>> It looks like the Javadoc for ByteArrayDeserializer and other
>> Deserializers can return null[1, 2] and we aren't using
>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>> non XLang KafkaIO does this correctly in its regular coder inference
>> logic[4]. I flied BEAM-10529[5]
>>
>> 1:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>> 2:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>> 3:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>> 4:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>
>>
>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to build a streaming beam pipeline in python which should
>>> capture messages from kafka and then execute further stages of data
>>> fetching from other sources and aggregation. The step-by-step process of
>>> what I have built till now is:
>>>
>>>    1.
>>>
>>>    Running Kafka instance on localhost:9092
>>>
>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>    2.
>>>
>>>    Run beam-flink job server using docker
>>>
>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>    3.
>>>
>>>    Run beam-kafka pipeline
>>>
>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>
>>> if __name__ == '__main__':
>>>     options = PipelineOptions([
>>>         "--job_endpoint=localhost:8099",
>>>         "--environment_type=LOOPBACK",
>>>         "--streaming",
>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>     ])
>>>
>>>     options = options.view_as(StandardOptions)
>>>     options.streaming = True
>>>
>>>     pipeline = beam.Pipeline(options=options)
>>>
>>>     result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>         )
>>>
>>>         | beam.Map(print)
>>>     )
>>>
>>>     pipeline.run()
>>>
>>>
>>>    1. Publish new message using kafka-producer.sh
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>
>>> After publishing this trial message, the beam pipeline perceives the
>>> message but crashes giving this error:
>>>
>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>     at org.apache.beam
>>>
>>> Regards,
>>>
>>> Ayush Sharma.
>>>
>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Chamikara Jayalath <ch...@google.com>.
Yes, seems like this is due to the key being null. XLang KafkaIO has to be
updated to support this. You should not run into this error if you publish
keys and values that are not null.




On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:

> +dev <de...@beam.apache.org>
>
> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>
>> +Heejong Lee <he...@google.com> +Chamikara Jayalath
>> <ch...@google.com>
>>
>> Do you know if your trial record has an empty key or value?
>> If so, then you hit a bug and it seems as though there was a miss
>> supporting this usecase.
>>
>> Heejong and Cham,
>> It looks like the Javadoc for ByteArrayDeserializer and other
>> Deserializers can return null[1, 2] and we aren't using
>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>> non XLang KafkaIO does this correctly in its regular coder inference
>> logic[4]. I flied BEAM-10529[5]
>>
>> 1:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>> 2:
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>> 3:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>> 4:
>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>
>>
>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to build a streaming beam pipeline in python which should
>>> capture messages from kafka and then execute further stages of data
>>> fetching from other sources and aggregation. The step-by-step process of
>>> what I have built till now is:
>>>
>>>    1.
>>>
>>>    Running Kafka instance on localhost:9092
>>>
>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>    2.
>>>
>>>    Run beam-flink job server using docker
>>>
>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>    3.
>>>
>>>    Run beam-kafka pipeline
>>>
>>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>>
>>> if __name__ == '__main__':
>>>     options = PipelineOptions([
>>>         "--job_endpoint=localhost:8099",
>>>         "--environment_type=LOOPBACK",
>>>         "--streaming",
>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>     ])
>>>
>>>     options = options.view_as(StandardOptions)
>>>     options.streaming = True
>>>
>>>     pipeline = beam.Pipeline(options=options)
>>>
>>>     result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>         )
>>>
>>>         | beam.Map(print)
>>>     )
>>>
>>>     pipeline.run()
>>>
>>>
>>>    1. Publish new message using kafka-producer.sh
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>>
>>> After publishing this trial message, the beam pipeline perceives the
>>> message but crashes giving this error:
>>>
>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>     at org.apache.beam
>>>
>>> Regards,
>>>
>>> Ayush Sharma.
>>>
>>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Luke Cwik <lc...@google.com>.
+dev <de...@beam.apache.org>

On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:

> +Heejong Lee <he...@google.com> +Chamikara Jayalath
> <ch...@google.com>
>
> Do you know if your trial record has an empty key or value?
> If so, then you hit a bug and it seems as though there was a miss
> supporting this usecase.
>
> Heejong and Cham,
> It looks like the Javadoc for ByteArrayDeserializer and other
> Deserializers can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
> non XLang KafkaIO does this correctly in its regular coder inference
> logic[4]. I flied BEAM-10529[5]
>
> 1:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
> 2:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
> 3:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
> 4:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
> 5: https://issues.apache.org/jira/browse/BEAM-10529
>
>
> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to build a streaming beam pipeline in python which should
>> capture messages from kafka and then execute further stages of data
>> fetching from other sources and aggregation. The step-by-step process of
>> what I have built till now is:
>>
>>    1.
>>
>>    Running Kafka instance on localhost:9092
>>
>>    ./bin/kafka-server-start.sh ./config/server.properties
>>    2.
>>
>>    Run beam-flink job server using docker
>>
>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>    3.
>>
>>    Run beam-kafka pipeline
>>
>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>
>> if __name__ == '__main__':
>>     options = PipelineOptions([
>>         "--job_endpoint=localhost:8099",
>>         "--environment_type=LOOPBACK",
>>         "--streaming",
>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>     ])
>>
>>     options = options.view_as(StandardOptions)
>>     options.streaming = True
>>
>>     pipeline = beam.Pipeline(options=options)
>>
>>     result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>         )
>>
>>         | beam.Map(print)
>>     )
>>
>>     pipeline.run()
>>
>>
>>    1. Publish new message using kafka-producer.sh
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>
>> After publishing this trial message, the beam pipeline perceives the
>> message but crashes giving this error:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>     at org.apache.beam
>>
>> Regards,
>>
>> Ayush Sharma.
>>
>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Luke Cwik <lc...@google.com>.
+dev <de...@beam.apache.org>

On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:

> +Heejong Lee <he...@google.com> +Chamikara Jayalath
> <ch...@google.com>
>
> Do you know if your trial record has an empty key or value?
> If so, then you hit a bug and it seems as though there was a miss
> supporting this usecase.
>
> Heejong and Cham,
> It looks like the Javadoc for ByteArrayDeserializer and other
> Deserializers can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
> non XLang KafkaIO does this correctly in its regular coder inference
> logic[4]. I flied BEAM-10529[5]
>
> 1:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
> 2:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
> 3:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
> 4:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
> 5: https://issues.apache.org/jira/browse/BEAM-10529
>
>
> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to build a streaming beam pipeline in python which should
>> capture messages from kafka and then execute further stages of data
>> fetching from other sources and aggregation. The step-by-step process of
>> what I have built till now is:
>>
>>    1.
>>
>>    Running Kafka instance on localhost:9092
>>
>>    ./bin/kafka-server-start.sh ./config/server.properties
>>    2.
>>
>>    Run beam-flink job server using docker
>>
>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>    3.
>>
>>    Run beam-kafka pipeline
>>
>> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>>
>> if __name__ == '__main__':
>>     options = PipelineOptions([
>>         "--job_endpoint=localhost:8099",
>>         "--environment_type=LOOPBACK",
>>         "--streaming",
>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>     ])
>>
>>     options = options.view_as(StandardOptions)
>>     options.streaming = True
>>
>>     pipeline = beam.Pipeline(options=options)
>>
>>     result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>         )
>>
>>         | beam.Map(print)
>>     )
>>
>>     pipeline.run()
>>
>>
>>    1. Publish new message using kafka-producer.sh
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>>
>> After publishing this trial message, the beam pipeline perceives the
>> message but crashes giving this error:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>     at org.apache.beam
>>
>> Regards,
>>
>> Ayush Sharma.
>>
>>

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Posted by Luke Cwik <lc...@google.com>.
+Heejong Lee <he...@google.com> +Chamikara Jayalath <ch...@google.com>


Do you know if your trial record has an empty key or value?
If so, then you hit a bug and it seems as though there was a miss
supporting this usecase.

Heejong and Cham,
It looks like the Javadoc for ByteArrayDeserializer and other Deserializers
can return null[1, 2] and we aren't using
NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
non XLang KafkaIO does this correctly in its regular coder inference
logic[4]. I flied BEAM-10529[5]

1:
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
2:
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
3:
https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
4:
https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
5: https://issues.apache.org/jira/browse/BEAM-10529


On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <17...@gmail.com> wrote:

> Hi,
>
> I am trying to build a streaming beam pipeline in python which should
> capture messages from kafka and then execute further stages of data
> fetching from other sources and aggregation. The step-by-step process of
> what I have built till now is:
>
>    1.
>
>    Running Kafka instance on localhost:9092
>
>    ./bin/kafka-server-start.sh ./config/server.properties
>    2.
>
>    Run beam-flink job server using docker
>
>    docker run --net=host apache/beam_flink1.10_job_server:latest
>    3.
>
>    Run beam-kafka pipeline
>
> import apache_beam as beamfrom apache_beam.io.external.kafka import ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
>
> if __name__ == '__main__':
>     options = PipelineOptions([
>         "--job_endpoint=localhost:8099",
>         "--environment_type=LOOPBACK",
>         "--streaming",
>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>     ])
>
>     options = options.view_as(StandardOptions)
>     options.streaming = True
>
>     pipeline = beam.Pipeline(options=options)
>
>     result = (
>         pipeline
>
>         | "Read from kafka" >> ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": 'localhost:9092',
>             },
>             topics=['mytopic'],
>             expansion_service='localhost:8097',
>         )
>
>         | beam.Map(print)
>     )
>
>     pipeline.run()
>
>
>    1. Publish new message using kafka-producer.sh
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic>tryme
>
> After publishing this trial message, the beam pipeline perceives the
> message but crashes giving this error:
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>     at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>     at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>     at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>     at org.apache.beam
>
> Regards,
>
> Ayush Sharma.
>
>