You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aviem Zur (JIRA)" <ji...@apache.org> on 2017/05/01 16:18:04 UTC

[jira] [Commented] (BEAM-2111) java.lang.ClassCastException in spark in streaming mode

    [ https://issues.apache.org/jira/browse/BEAM-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991017#comment-15991017 ] 

Aviem Zur commented on BEAM-2111:
---------------------------------

I was able to create a test to reproduce it, however it is still quite involved, I am still trying to narrow down the reproducing pipeline so we can fix this.

[~echauchot] In the meantime you can workaround this issue by changing {{Arrays#asList}} to {{Collections#singletonList}}.

Test to reproduce:
{code}
     Instant instant = new Instant(0);

    CreateStream<Long> source1 =
        CreateStream.of(VarLongCoder.of(), Duration.standardSeconds(5))
            .emptyBatch()
            .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
            .nextBatch(
                TimestampedValue.of(1L, instant))
            .advanceNextBatchWatermarkToInfinity();

    pipeline.apply(source1)
        .apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(ParDo.of(new DoFn<Long, KV<List<Long>, Long>>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            c.output(KV.of(Arrays.asList(1L), c.element()));
          }
        }))
        .apply(
            Combine
                .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
                  @Override
                  public KV<List<Long>, Long> apply(
                      KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
                    return KV.of(Collections.singletonList(1L), 1L);
                  }
                })
                .withoutDefaults()
                .withFanout(5));

    pipeline.run();
{code}

> java.lang.ClassCastException in spark in streaming mode
> -------------------------------------------------------
>
>                 Key: BEAM-2111
>                 URL: https://issues.apache.org/jira/browse/BEAM-2111
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Etienne Chauchot
>            Assignee: Aviem Zur
>
> Bug can be reproduced with :
> run Nexmark query5 (https://github.com/iemejia/beam/tree/BEAM-160-nexmark) in streaming mode using Spark.
> Run main in 
> {code}org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver{code}
> with VMOptions:
> {code} -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true {code}
> with Program arguments:
> {code}--query=5  --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=true --enforceEncodability=false --enforceImmutability=false{code}
> StackTrace is 
> {code}
>  com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: java.lang.ClassCastException: [J cannot be cast to [Ljava.lang.Object;
> Serialization trace:
> key (org.apache.beam.sdk.values.KV)
> value (org.apache.beam.sdk.values.KV)
> value (org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> 	at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
> 	at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:114)
> 	at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:52)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: java.lang.ClassCastException: [J cannot be cast to [Ljava.lang.Object;
> 	at com.twitter.chill.java.ArraysAsListSerializer.read(ArraysAsListSerializer.java:69)
> 	at com.twitter.chill.java.ArraysAsListSerializer.read(ArraysAsListSerializer.java:41)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	... 56 more
> Caused by: java.lang.ClassCastException: [J cannot be cast to [Ljava.lang.Object;
> 	at com.twitter.chill.java.ArraysAsListSerializer.read(ArraysAsListSerializer.java:63)
> 	... 59 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)