You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Alexey Diomin (JIRA)" <ji...@apache.org> on 2017/01/11 11:42:58 UTC
[jira] [Commented] (BEAM-1255) java.io.NotSerializableException in
flink on UnboundedSource
[ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15818082#comment-15818082 ]
Alexey Diomin commented on BEAM-1255:
-------------------------------------
This bug relate for serialization of UnboundedSourceWrapper
{code}
@Test
public void testSerialization() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
TestCountingSource source = new TestCountingSource(1);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);
InstantiationUtil.serializeObject(flinkWrapper);
}
{code}
> java.io.NotSerializableException in flink on UnboundedSource
> ------------------------------------------------------------
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 0.5.0
> Reporter: Alexey Diomin
> Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl
> - element of array (index: 0)
> - array (class "[Ljava.lang.Object;", size: 2)
> - field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", type: "class [Ljava.lang.Object;")
> - object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
> - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
> - object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
> - field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface java.lang.reflect.Type")
> - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
> - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class com.google.common.reflect.TypeToken")
> - object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
> - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
> - object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder)
> - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface org.apache.beam.sdk.coders.Coder")
> - object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder))
> - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
> - object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder)))
> - field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
> - root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
> public static void main(String[] args) {
> FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> options.setRunner(FlinkRunner.class);
> options.setStreaming(true);
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopics(ImmutableList.of("test"))
> // set ConsumerGroup
> .withoutMetadata());
> pipeline.run();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)