You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Daniel Halperin (JIRA)" <ji...@apache.org> on 2017/04/28 14:35:04 UTC

[jira] [Commented] (BEAM-2114) KafkaIO broken with CoderException

    [ https://issues.apache.org/jira/browse/BEAM-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988914#comment-15988914 ] 

Daniel Halperin commented on BEAM-2114:
---------------------------------------


When using `withXDeserializer`, to parse data from the Kafka source, we still need a Coder for the `X` (Key or Value) once the data is in the pipeline. How this works today is here: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L620

* we get the type from the KeyDeserializer and look for a coder that works with it.
* if the type isn’t found, we use the coders you specify.

It looks like there are two issues:

1. Documentation and messaging: the error messages could be greatly improved.

        a. review the class-level javadoc
        b. fix the error messaging. The error if the coder isn't specified should say something like "Unable to automatically infer a Coder for the Kafka Deserializer %s: no coder registered for type %s" .


2. During coder inference, wrap `NullableCoder` by default around the inferred coder. I think this is mandatory, unless Kafka Deserializers have specific ways to tell values cannot be null.

> KafkaIO broken with CoderException
> ----------------------------------
>
>                 Key: BEAM-2114
>                 URL: https://issues.apache.org/jira/browse/BEAM-2114
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Devon Meunier
>            Assignee: Daniel Halperin
>             Fix For: First stable release
>
>
> For a KafkaIO.Read<String, String> I simply replaced {{withKeyCoder}} and {{withValueCoder}} with {{withKeyDeserializer}} and {{withValueDeserializer}} using `StringDeserializer.class` on dataflow and I receive the following traceback:
> {code}
> java.lang.reflect.InvocationTargetException
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
> 	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
> 	at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:136)
> 	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
> 	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	... 1 more
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
> 	at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:75)
> 	at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:41)
> 	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:88)
> 	at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:60)
> 	at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:36)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:122)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:106)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:91)
> 	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
> 	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
> 	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
> 	... 8 more
> {code}
> attempting to use {{readWithCoders(StringUtf8Coder.of(), StringUtf8Coder.of())}} instead yields:
> {code}
> java.lang.reflect.InvocationTargetException
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: Error while populating display data for component: org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata
> 	at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:801)
> 	at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.access$100(DisplayData.java:733)
> 	at org.apache.beam.sdk.transforms.display.DisplayData.from(DisplayData.java:81)
> 	at org.apache.beam.runners.direct.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47)
> 	at org.apache.beam.runners.direct.DisplayDataValidator.access$100(DisplayDataValidator.java:29)
> 	at org.apache.beam.runners.direct.DisplayDataValidator$Visitor.enterCompositeTransform(DisplayDataValidator.java:56)
> 	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:479)
> 	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:483)
> 	at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:232)
> 	at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:207)
> 	at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:384)
> 	at org.apache.beam.runners.direct.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43)
> 	at org.apache.beam.runners.direct.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35)
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:265)
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:277)
> 	at com.shopify.attribution.pipelines.StoreSessions.main(StoreSessions.java:75)
> 	... 6 more
> Caused by: java.lang.RuntimeException: Unknown value type: StringUtf8Coder
> 	at org.apache.beam.sdk.transforms.display.DisplayData.item(DisplayData.java:886)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.populateDisplayData(KafkaIO.java:750)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.populateDisplayData(KafkaIO.java:785)
> 	at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:794)
> 	... 22 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)