You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:58:53 UTC

[03/15] flink git commit: [FLINK-2635] [streaming] Make input processors independent of batch reader interface.

[FLINK-2635] [streaming] Make input processors independent of batch reader interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b18e410b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b18e410b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b18e410b

Branch: refs/heads/master
Commit: b18e410bc336eb46497aaa75633c16ebe6139554
Parents: c09d14a
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 8 16:19:24 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 8 20:58:05 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/StreamInputProcessor.java        | 22 +++++++++--------
 .../runtime/io/StreamTwoInputProcessor.java     | 25 +++++++-------------
 .../flink/streaming/api/PartitionerTest.java    |  7 +++---
 3 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index cc91d63..8ce8a01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -52,19 +52,21 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  * 
  * @param <IN> The type of the record that can be read with this record reader.
  */
-public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {
+public class StreamInputProcessor<IN> {
 	
 	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
+	private final CheckpointBarrierHandler barrierHandler;
+
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
 	private int currentChannel = -1;
 
 	private boolean isFinished;
 
-	private final CheckpointBarrierHandler barrierHandler;
+	
 
 	private final long[] watermarks;
 	private long lastEmittedWatermark;
@@ -77,8 +79,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
 								CheckpointingMode checkpointMode,
 								IOManager ioManager,
 								boolean enableWatermarkMultiplexing) throws IOException {
-		
-		super(InputGateUtil.createInputGate(inputGates));
+
+		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
 			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
@@ -173,7 +175,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
 				else {
 					// Event received
 					final AbstractEvent event = bufferOrEvent.getEvent();
-					handleEvent(event);
+					if (event.getClass() != EndOfPartitionEvent.class) {
+						throw new IOException("Unexpected event: " + event);
+					}
 				}
 			}
 			else {
@@ -185,15 +189,13 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
 			}
 		}
 	}
-
-	@Override
+	
 	public void setReporter(AccumulatorRegistry.Reporter reporter) {
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			deserializer.setReporter(reporter);
 		}
 	}
-
-	@Override
+	
 	public void cleanup() throws IOException {
 		// clear the buffers first. this part should not ever fail
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 7dffa71..6322cc8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -41,8 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -58,10 +55,7 @@ import java.util.Collection;
  * @param <IN1> The type of the records that arrive on the first input
  * @param <IN2> The type of the records that arrive on the second input
  */
-public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader {
-
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+public class StreamTwoInputProcessor<IN1, IN2> {
 
 	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
@@ -97,7 +91,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {
 		
-		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
+		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
 			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
@@ -157,8 +151,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 		Arrays.fill(watermarks2, Long.MIN_VALUE);
 		lastEmittedWatermark2 = Long.MIN_VALUE;
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
 		if (isFinished) {
 			return false;
@@ -216,7 +209,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 				} else {
 					// Event received
 					final AbstractEvent event = bufferOrEvent.getEvent();
-					handleEvent(event);
+					if (event.getClass() != EndOfPartitionEvent.class) {
+						throw new IOException("Unexpected event: " + event);
+					}
 				}
 			}
 			else {
@@ -259,15 +254,13 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 			}
 		}
 	}
-
-	@Override
+	
 	public void setReporter(AccumulatorRegistry.Reporter reporter) {
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			deserializer.setReporter(reporter);
 		}
 	}
-
-	@Override
+	
 	public void cleanup() throws IOException {
 		// clear the buffers first. this part should not ever fail
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
index 987a8fb..a6c6936 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -32,13 +32,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
 import org.junit.Test;
 
 /**
@@ -147,7 +146,9 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase {
 
 		try {
 			env.execute();
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
+			e.printStackTrace();
 			fail(e.getMessage());
 		}