You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:05 UTC
[05/14] flink git commit: [FLINK-1638] [streaming] RegisterState
removed from datastream + CoRecordReader barrier test added
[FLINK-1638] [streaming] RegisterState removed from datastream + CoRecordReader barrier test added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/490fa700
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/490fa700
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/490fa700
Branch: refs/heads/master
Commit: 490fa700c6310b0f30decd058f97a38955136566
Parents: 37390d6
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 09:12:51 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../kafka/api/simple/PersistentKafkaSource.java | 11 ++-
.../flink/streaming/api/StreamConfig.java | 16 ----
.../apache/flink/streaming/api/StreamGraph.java | 26 +-----
.../api/StreamingJobGraphGenerator.java | 1 -
.../datastream/SingleOutputStreamOperator.java | 43 ----------
.../api/invokable/operator/co/CoInvokable.java | 6 +-
.../api/streamvertex/StreamVertex.java | 68 ++++++---------
.../streamvertex/StreamingRuntimeContext.java | 27 ++++--
.../flink/streaming/io/BarrierBuffer.java | 11 ++-
.../flink/streaming/io/CoRecordReader.java | 10 ++-
.../flink/streaming/io/BarrierBufferTest.java | 12 +--
.../flink/streaming/io/CoRecordReaderTest.java | 90 ++++++++++++++++++++
12 files changed, 169 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index fd428c0..0f980e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -45,17 +45,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
this.initialOffset = initialOffset;
}
+ @SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws InterruptedException {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- @SuppressWarnings("unchecked")
- OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka");
-
- if (lastKafkaOffSet.getState() == null) {
+
+ if (context.containsState("kafka")) {
+ kafkaOffSet = (OperatorState<Long>) context.getState("kafka");
+ } else {
kafkaOffSet = new OperatorState<Long>(initialOffset);
context.registerState("kafka", kafkaOffSet);
- } else {
- kafkaOffSet = lastKafkaOffSet;
}
super.open(parameters);
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d813a30..ea19a44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.util.InstantiationUtil;
public class StreamConfig implements Serializable {
@@ -55,7 +54,6 @@ public class StreamConfig implements Serializable {
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
private static final String BUFFER_TIMEOUT = "bufferTimeout";
- private static final String OPERATOR_STATES = "operatorStates";
private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
@@ -331,20 +329,6 @@ public class StreamConfig implements Serializable {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}
- public void setOperatorStates(Map<String, OperatorState<?>> states) {
- config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
- }
-
- @SuppressWarnings("unchecked")
- public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
- try {
- return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
- this.config, OPERATOR_STATES, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not load operator state");
- }
- }
-
public void setChainedOutputs(List<Integer> chainedOutputs) {
config.setBytes(CHAINED_OUTPUTS,
SerializationUtils.serialize((Serializable) chainedOutputs));
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 8334aa1..f0fbaab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
@@ -85,7 +84,6 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, Integer> iterationIDtoTailID;
private Map<Integer, Integer> iterationTailCount;
private Map<Integer, Long> iterationTimeouts;
- private Map<Integer, Map<String, OperatorState<?>>> operatorStates;
private Map<Integer, InputFormat<String, ?>> inputFormatLists;
private List<Map<Integer, ?>> containingMaps;
@@ -149,10 +147,8 @@ public class StreamGraph extends StreamingPlan {
containingMaps.add(iterationTailCount);
iterationTimeouts = new HashMap<Integer, Long>();
containingMaps.add(iterationTailCount);
- operatorStates = new HashMap<Integer, Map<String, OperatorState<?>>>();
- containingMaps.add(operatorStates);
inputFormatLists = new HashMap<Integer, InputFormat<String, ?>>();
- containingMaps.add(operatorStates);
+ containingMaps.add(inputFormatLists);
sources = new HashSet<Integer>();
}
@@ -419,22 +415,6 @@ public class StreamGraph extends StreamingPlan {
return this.bufferTimeouts.get(vertexID);
}
- public void addOperatorState(Integer vertexName, String stateName, OperatorState<?> state) {
- Map<String, OperatorState<?>> states = operatorStates.get(vertexName);
- if (states == null) {
- states = new HashMap<String, OperatorState<?>>();
- states.put(stateName, state);
- } else {
- if (states.containsKey(stateName)) {
- throw new RuntimeException("State has already been registered with this name: "
- + stateName);
- } else {
- states.put(stateName, state);
- }
- }
- operatorStates.put(vertexName, states);
- }
-
/**
* Sets a user defined {@link OutputSelector} for the given operator. Used
* for directed emits.
@@ -594,10 +574,6 @@ public class StreamGraph extends StreamingPlan {
return outputSelectors.get(vertexID);
}
- public Map<String, OperatorState<?>> getState(Integer vertexID) {
- return operatorStates.get(vertexID);
- }
-
public Integer getIterationID(Integer vertexID) {
return iterationIds.get(vertexID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index b50ac25..ecb6455 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -212,7 +212,6 @@ public class StreamingJobGraphGenerator {
config.setUserInvokable(streamGraph.getInvokable(vertexID));
config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
- config.setOperatorStates(streamGraph.getState(vertexID));
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setOutputs(nonChainableOutputs);
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b0fc364..16284d4 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,16 +17,10 @@
package org.apache.flink.streaming.api.datastream;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.runtime.state.OperatorState;
/**
* The SingleOutputStreamOperator represents a user defined transformation
@@ -99,43 +93,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
return this;
}
- /**
- * This is a beta feature </br></br> Register an operator state for this
- * operator by the given name. This name can be used to retrieve the state
- * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
- * obtain the {@link StreamingRuntimeContext} from the user-defined function
- * use the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param name
- * The name of the operator state.
- * @param state
- * The state to be registered for this name.
- * @return The data stream with state registered.
- */
- public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
- streamGraph.addOperatorState(getId(), name, state);
- return this;
- }
-
- /**
- * This is a beta feature </br></br> Register operator states for this
- * operator provided in a map. The registered states can be retrieved during
- * runtime using {@link StreamingRuntimeContext#getState(String)}. To obtain
- * the {@link StreamingRuntimeContext} from the user-defined function use
- * the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param states
- * The map containing the states that will be registered.
- * @return The data stream with states registered.
- */
- public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
- for (Entry<String, OperatorState<?>> entry : states.entrySet()) {
- streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue());
- }
-
- return this;
- }
-
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, O> broadcast() {
return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 2b407c6..b036829 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,16 +84,14 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
next = recordIterator.next(reuse1, reuse2);
} catch (IOException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record.", e);
} else {
// Task already cancelled do nothing
next = 0;
}
} catch (IllegalStateException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record.", e);
} else {
// Task already cancelled do nothing
next = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5ff47d6..eb0d6ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.streamvertex;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import org.apache.flink.runtime.event.task.TaskEvent;
@@ -64,8 +65,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
protected ClassLoader userClassLoader;
private EventListener<TaskEvent> superstepListener;
-
- private boolean onRecovery;
public StreamVertex() {
userInvokable = null;
@@ -89,57 +88,38 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
- if(!onRecovery)
- {
- this.states = configuration.getOperatorStates(userClassLoader);
- }
+ this.states = new HashMap<String, OperatorState<?>>();
this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
}
- protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
- userInvokable.setRuntimeContext(context);
- userInvokable.open(getTaskConfiguration());
-
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.setRuntimeContext(context);
- invokable.open(getTaskConfiguration());
- }
-
- userInvokable.invoke();
- userInvokable.close();
-
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.close();
- }
-
- }
-
@Override
public void broadcastBarrier(long id) {
- //Only called at input vertices
+ // Only called at input vertices
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from jobmanager: " + id);
}
actOnBarrier(id);
}
+ /**
+ * This method is called to confirm that a barrier has been fully processed.
+ * It sends an acknowledgment to the jobmanager. In the current version if
+ * there is user state it also checkpoints the state to the jobmanager.
+ */
@Override
public void confirmBarrier(long barrierID) {
-
- if(configuration.getStateMonitoring() && states != null)
- {
+
+ if (configuration.getStateMonitoring() && !states.isEmpty()) {
getEnvironment().getJobManager().tell(
- new StateBarrierAck(getEnvironment().getJobID(),
- getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(),
- barrierID, states), ActorRef.noSender());
- }
- else
- {
+ new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+ .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states),
+ ActorRef.noSender());
+ } else {
getEnvironment().getJobManager().tell(
new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
- context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+ context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
}
-
+
}
public void setInputsOutputs() {
@@ -273,10 +253,17 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
return this.superstepListener;
}
+ /**
+ * Method to be called when a barrier is received from all the input
+ * channels. It should broadcast the barrier to the output operators,
+ * checkpoint the state and send an ack.
+ *
+ * @param id
+ */
private void actOnBarrier(long id) {
try {
outputHandler.broadcastBarrier(id);
- //TODO checkpoint state here
+ // TODO checkpoint state here
confirmBarrier(id);
if (LOG.isDebugEnabled()) {
LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
@@ -293,12 +280,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
}
+ /**
+ * Re-injects the user states into the map
+ */
@Override
- public void injectStates(Map<String,OperatorState<?>> states) {
- onRecovery = true;
+ public void injectStates(Map<String, OperatorState<?>> states) {
this.states.putAll(states);
}
-
private class SuperstepEventListener implements EventListener<TaskEvent> {
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 492d2a0..da083fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.streamvertex;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.configuration.Configuration;
@@ -64,19 +65,35 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
throw new RuntimeException("No state has been registered for the name: " + name);
}
}
+ }
+ /**
+ * Returns whether there is a state stored by the given name
+ */
+ public boolean containsState(String name) {
+ return operatorStates.containsKey(name);
}
+ /**
+ * This is a beta feature </br></br> Register an operator state for this
+ * operator by the given name. This name can be used to retrieve the state
+ * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
+ * obtain the {@link StreamingRuntimeContext} from the user-defined function
+ * use the {@link RichFunction#getRuntimeContext()} method.
+ *
+ * @param name
+ * The name of the operator state.
+ * @param state
+ * The state to be registered for this name.
+ * @return The data stream with state registered.
+ */
public void registerState(String name, OperatorState<?> state) {
if (state == null) {
throw new RuntimeException("Cannot register null state");
} else {
- if(operatorStates.containsKey(name))
- {
+ if (operatorStates.containsKey(name)) {
throw new RuntimeException("State is already registered");
- }
- else
- {
+ } else {
operatorStates.put(name, state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 7dfccb0..42d4919 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -30,6 +30,15 @@ import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Class encapsulating the functionality that is necessary to sync inputs on
+ * superstep barriers. Once a barrier is received from an input channel, whe
+ * should not process further buffers from that channel until we received the
+ * barrier from all other channels as well. To avoid back-pressuring the
+ * readers, we buffer up the new data received from the blocked channels until
+ * the blocks are released.
+ *
+ */
public class BarrierBuffer {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
@@ -54,7 +63,7 @@ public class BarrierBuffer {
}
/**
- * Starts the next superstep
+ * Starts the next superstep in the buffer
*
* @param superstep
* The next superstep
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 6c91f4d..c32db4e 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -62,8 +62,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
private boolean hasRequestedPartitions;
- private CoBarrierBuffer barrierBuffer1;
- private CoBarrierBuffer barrierBuffer2;
+ protected CoBarrierBuffer barrierBuffer1;
+ protected CoBarrierBuffer barrierBuffer2;
public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
super(new UnionInputGate(inputgate1, inputgate2));
@@ -195,7 +195,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
}
}
- private int getNextReaderIndexBlocking() throws InterruptedException {
+ protected int getNextReaderIndexBlocking() throws InterruptedException {
Integer nextIndex = 0;
@@ -230,6 +230,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
@Override
public void onEvent(InputGate bufferReader) {
+ addToAvailable(bufferReader);
+ }
+
+ protected void addToAvailable(InputGate bufferReader){
if (bufferReader == bufferReader1) {
availableRecordReaders.add(1);
} else if (bufferReader == bufferReader2) {
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 203216b..1b4cc36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -44,8 +44,6 @@ public class BarrierBufferTest {
input.add(createBuffer(0));
input.add(createBuffer(0));
input.add(createBuffer(0));
- input.add(createBuffer(2));
- input.add(createBuffer(2));
InputGate mockIG = new MockInputGate(1, input);
AbstractReader mockAR = new MockReader(mockIG);
@@ -55,8 +53,6 @@ public class BarrierBufferTest {
assertEquals(input.get(0), bb.getNextNonBlocked());
assertEquals(input.get(1), bb.getNextNonBlocked());
assertEquals(input.get(2), bb.getNextNonBlocked());
- assertEquals(input.get(3), bb.getNextNonBlocked());
- assertEquals(input.get(4), bb.getNextNonBlocked());
}
@@ -136,7 +132,7 @@ public class BarrierBufferTest {
}
- private static class MockInputGate implements InputGate {
+ protected static class MockInputGate implements InputGate {
private int numChannels;
private Queue<BufferOrEvent> boes;
@@ -175,7 +171,7 @@ public class BarrierBufferTest {
}
- private static class MockReader extends AbstractReader {
+ protected static class MockReader extends AbstractReader {
protected MockReader(InputGate inputGate) {
super(inputGate);
@@ -183,11 +179,11 @@ public class BarrierBufferTest {
}
- private static BufferOrEvent createSuperstep(long id, int channel) {
+ protected static BufferOrEvent createSuperstep(long id, int channel) {
return new BufferOrEvent(new StreamingSuperstep(id), channel);
}
- private static BufferOrEvent createBuffer(int channel) {
+ protected static BufferOrEvent createBuffer(int channel) {
return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
new BufferRecycler() {
http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
new file mode 100644
index 0000000..1e57d14
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate;
+import org.junit.Test;
+
+public class CoRecordReaderTest {
+
+ @Test
+ public void test() throws InterruptedException, IOException {
+
+ List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
+ input1.add(BarrierBufferTest.createBuffer(0));
+ input1.add(BarrierBufferTest.createSuperstep(1, 0));
+ input1.add(BarrierBufferTest.createBuffer(0));
+
+ InputGate ig1 = new MockInputGate(1, input1);
+
+ List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
+ input2.add(BarrierBufferTest.createBuffer(0));
+ input2.add(BarrierBufferTest.createBuffer(0));
+ input2.add(BarrierBufferTest.createSuperstep(1, 0));
+ input2.add(BarrierBufferTest.createBuffer(0));
+
+ InputGate ig2 = new MockInputGate(1, input2);
+
+ CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
+ ig1, ig2);
+ BarrierBuffer b1 = coReader.barrierBuffer1;
+ BarrierBuffer b2 = coReader.barrierBuffer2;
+
+ coReader.addToAvailable(ig1);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig1);
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+ b1.processSuperstep(input1.get(1));
+
+ coReader.addToAvailable(ig1);
+ coReader.addToAvailable(ig2);
+ coReader.addToAvailable(ig2);
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+ b2.processSuperstep(input2.get(2));
+
+ assertEquals(1, coReader.getNextReaderIndexBlocking());
+ b1.getNextNonBlocked();
+
+ assertEquals(2, coReader.getNextReaderIndexBlocking());
+ b2.getNextNonBlocked();
+ }
+
+}