You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Konstantin Kulagin <kk...@gmail.com> on 2016/04/22 15:47:28 UTC

java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Hi guys,

trying to run this example:

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Tuple2<Long, String>> source = env.addSource(new
SourceFunction<Tuple2<Long, String>>() {
      @Override
      public void run(SourceContext<Tuple2<Long, String>> ctx) throws
Exception {
        LongStream.range(0, 33).forEach(l -> {
          ctx.collect(Tuple2.of(0L, "This is " + l));
        });
      }

      @Override
      public void cancel() {
      }
    });


    source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
//    source.
    keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).

        apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple,
GlobalWindow>() {
          @Override
          public void apply(Tuple tuple, GlobalWindow window,
Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws
Exception {
            System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
          }
        });

    env.execute("yoyoyo");

Getting Caused by: java.lang.ClassCastException:
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)


- After googling I've found this:
https://issues.apache.org/jira/browse/FLINK-3688

- went to github, downloaded branch 1.0.2 which contains specified
change but having the same results.

What am I missing here?

Thanks!

Konstantin

Re: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Posted by Robert Metzger <rm...@apache.org>.
I've filed a JIRA to improve the error message:
https://issues.apache.org/jira/browse/FLINK-3918

On Fri, Apr 22, 2016 at 11:17 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Konstantin,
>
> this exception is thrown if you do not set the time characteristic to
> event time and assign timestamps.
> Please try to add
>
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> after you obtained the StreamExecutionEnvironment.
>
> Best, Fabian
>
> 2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <kk...@gmail.com>:
>
>> Hi guys,
>>
>> trying to run this example:
>>
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
>>       @Override
>>       public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
>>         LongStream.range(0, 33).forEach(l -> {
>>           ctx.collect(Tuple2.of(0L, "This is " + l));
>>         });
>>       }
>>
>>       @Override
>>       public void cancel() {
>>       }
>>     });
>>
>>
>>     source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
>> //    source.
>>     keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).
>>
>>         apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
>>           @Override
>>           public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
>>             System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
>>           }
>>         });
>>
>>     env.execute("yoyoyo");
>>
>> Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>> 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
>> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
>> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
>>
>>
>> - After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688
>>
>> - went to github, downloaded branch 1.0.2 which contains specified change but having the same results.
>>
>> What am I missing here?
>>
>> Thanks!
>>
>> Konstantin
>>
>>
>>
>

Re: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Konstantin,

this exception is thrown if you do not set the time characteristic to event
time and assign timestamps.
Please try to add

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

after you obtained the StreamExecutionEnvironment.

Best, Fabian

2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <kk...@gmail.com>:

> Hi guys,
>
> trying to run this example:
>
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
>       @Override
>       public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
>         LongStream.range(0, 33).forEach(l -> {
>           ctx.collect(Tuple2.of(0L, "This is " + l));
>         });
>       }
>
>       @Override
>       public void cancel() {
>       }
>     });
>
>
>     source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
> //    source.
>     keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).
>
>         apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
>           @Override
>           public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
>             System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
>           }
>         });
>
>     env.execute("yoyoyo");
>
> Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
>
>
> - After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688
>
> - went to github, downloaded branch 1.0.2 which contains specified change but having the same results.
>
> What am I missing here?
>
> Thanks!
>
> Konstantin
>
>
>