You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2018/01/02 16:10:00 UTC

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

    [ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308293#comment-16308293 ] 

Aljoscha Krettek commented on FLINK-8282:
-----------------------------------------

In my opinion, there is no good separation of concerns between the {{StreamTask}} and the {{StreamOperator}}. {{AbstractStreamOperator}} does a lot of things that a stream operator shouldn't do and does things in a very specific way that {{StreamTask}} and other components expect to be done in this way and things go haywire if the operator doesn't behave that way.

Off the top of my head this includes everything that happens in {{setup()}}, i.e. metrics setup and configuration stuff, everything that happens in the various state initialisation methods and snapshot/restore methods, and the latency marker stuff.

There is actually this (somewhat old) issue: FLINK-4859.

> Transformation with TwoInputStreamOperator fails
> ------------------------------------------------
>
>                 Key: FLINK-8282
>                 URL: https://issues.apache.org/jira/browse/FLINK-8282
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions below). The transformation with a {{TwoInputStreamOperator}} does not extend {{AbstractStreamOperator}}. I think this is the main cause why it fails. Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 		DataStream<Integer> ds1 = env.addSource(new SourceFunction<Integer>() {
> 			@Override
> 			public void run(SourceContext<Integer> ctx) throws Exception {
> 				ctx.emitWatermark(new Watermark(100L));
> 				ctx.collect(12);
> 				while (true) Thread.yield();
> 			}
> 			@Override
> 			public void cancel() {
> 			}
> 		});
> 		DataStream<Integer> ds2 = env.addSource(new SourceFunction<Integer>() {
> 			@Override
> 			public void run(SourceContext<Integer> ctx) throws Exception {
> 				ctx.emitWatermark(new Watermark(200L));
> 				ctx.collect(12);
> 				while (true) Thread.yield();
> 			}
> 			@Override
> 			public void cancel() {
> 			}
> 		});
> 		ds1.connect(ds2.broadcast()).transform("test", Types.INT, new TwoInputStreamOperator<Integer, Integer, Integer>() {
> 			@Override
> 			public void processElement1(StreamRecord<Integer> element) throws Exception {
> 				System.out.println();
> 			}
> 			@Override
> 			public void processElement2(StreamRecord<Integer> element) throws Exception {
> 				System.out.println();
> 			}
> 			@Override
> 			public void processWatermark1(Watermark mark) throws Exception {
> 				System.out.println();
> 			}
> 			@Override
> 			public void processWatermark2(Watermark mark) throws Exception {
> 				System.out.println();
> 			}
> 			@Override
> 			public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
> 			}
> 			@Override
> 			public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
> 			}
> 			@Override
> 			public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {
> 			}
> 			@Override
> 			public void open() throws Exception {
> 			}
> 			@Override
> 			public void close() throws Exception {
> 			}
> 			@Override
> 			public void dispose() throws Exception {
> 			}
> 			@Override
> 			public OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
> 				return null;
> 			}
> 			@Override
> 			public void initializeState(OperatorSubtaskState stateHandles) throws Exception {
> 			}
> 			@Override
> 			public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
> 			}
> 			@Override
> 			public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
> 			}
> 			@Override
> 			public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
> 			}
> 			@Override
> 			public ChainingStrategy getChainingStrategy() {
> 				return null;
> 			}
> 			@Override
> 			public void setChainingStrategy(ChainingStrategy strategy) {
> 			}
> 			@Override
> 			public MetricGroup getMetricGroup() {
> 				return null;
> 			}
> 			@Override
> 			public OperatorID getOperatorID() {
> 				return null;
> 			}
> 		}).print();
> 		// execute program
> 		env.execute("Streaming WordCount");
> {code}
> Exceptions are either:
> {code}
> 16:27:51,177 WARN  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while emitting latency marker.
> java.lang.RuntimeException: Buffer pool is destroyed.
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:825)
> 	at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
> 	... 10 more
> {code}
> or
> {code}
> 16:33:53,826 WARN  org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor  - An exception occurred during the metrics setup.
> java.lang.NullPointerException
> 	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:211)
> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:278)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)