You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 20:58:53 UTC
[03/15] flink git commit: [FLINK-2635] [streaming] Make input
processors independent of batch reader interface.
[FLINK-2635] [streaming] Make input processors independent of batch reader interface.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b18e410b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b18e410b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b18e410b
Branch: refs/heads/master
Commit: b18e410bc336eb46497aaa75633c16ebe6139554
Parents: c09d14a
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 8 16:19:24 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 8 20:58:05 2015 +0200
----------------------------------------------------------------------
.../runtime/io/StreamInputProcessor.java | 22 +++++++++--------
.../runtime/io/StreamTwoInputProcessor.java | 25 +++++++-------------
.../flink/streaming/api/PartitionerTest.java | 7 +++---
3 files changed, 25 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index cc91d63..8ce8a01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -52,19 +52,21 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
*
* @param <IN> The type of the record that can be read with this record reader.
*/
-public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {
+public class StreamInputProcessor<IN> {
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
+ private final CheckpointBarrierHandler barrierHandler;
+
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
private int currentChannel = -1;
private boolean isFinished;
- private final CheckpointBarrierHandler barrierHandler;
+
private final long[] watermarks;
private long lastEmittedWatermark;
@@ -77,8 +79,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
-
- super(InputGateUtil.createInputGate(inputGates));
+
+ InputGate inputGate = InputGateUtil.createInputGate(inputGates);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
@@ -173,7 +175,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
- handleEvent(event);
+ if (event.getClass() != EndOfPartitionEvent.class) {
+ throw new IOException("Unexpected event: " + event);
+ }
}
}
else {
@@ -185,15 +189,13 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
}
}
}
-
- @Override
+
public void setReporter(AccumulatorRegistry.Reporter reporter) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.setReporter(reporter);
}
}
-
- @Override
+
public void cleanup() throws IOException {
// clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 7dffa71..6322cc8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -41,8 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
@@ -58,10 +55,7 @@ import java.util.Collection;
* @param <IN1> The type of the records that arrive on the first input
* @param <IN2> The type of the records that arrive on the second input
*/
-public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader {
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+public class StreamTwoInputProcessor<IN1, IN2> {
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
@@ -97,7 +91,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
- super(InputGateUtil.createInputGate(inputGates1, inputGates2));
+ final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
@@ -157,8 +151,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
Arrays.fill(watermarks2, Long.MIN_VALUE);
lastEmittedWatermark2 = Long.MIN_VALUE;
}
-
- @SuppressWarnings("unchecked")
+
public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
if (isFinished) {
return false;
@@ -216,7 +209,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
} else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
- handleEvent(event);
+ if (event.getClass() != EndOfPartitionEvent.class) {
+ throw new IOException("Unexpected event: " + event);
+ }
}
}
else {
@@ -259,15 +254,13 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
}
-
- @Override
+
public void setReporter(AccumulatorRegistry.Reporter reporter) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.setReporter(reporter);
}
}
-
- @Override
+
public void cleanup() throws IOException {
// clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b18e410b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
index 987a8fb..a6c6936 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -32,13 +32,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
import org.junit.Test;
/**
@@ -147,7 +146,9 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase {
try {
env.execute();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
}