You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:05 UTC

[05/14] flink git commit: [FLINK-1638] [streaming] RegisterState removed from datastream + CoRecordReader barrier test added

[FLINK-1638] [streaming] RegisterState removed from datastream + CoRecordReader barrier test added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/490fa700
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/490fa700
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/490fa700

Branch: refs/heads/master
Commit: 490fa700c6310b0f30decd058f97a38955136566
Parents: 37390d6
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 9 09:12:51 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../kafka/api/simple/PersistentKafkaSource.java | 11 ++-
 .../flink/streaming/api/StreamConfig.java       | 16 ----
 .../apache/flink/streaming/api/StreamGraph.java | 26 +-----
 .../api/StreamingJobGraphGenerator.java         |  1 -
 .../datastream/SingleOutputStreamOperator.java  | 43 ----------
 .../api/invokable/operator/co/CoInvokable.java  |  6 +-
 .../api/streamvertex/StreamVertex.java          | 68 ++++++---------
 .../streamvertex/StreamingRuntimeContext.java   | 27 ++++--
 .../flink/streaming/io/BarrierBuffer.java       | 11 ++-
 .../flink/streaming/io/CoRecordReader.java      | 10 ++-
 .../flink/streaming/io/BarrierBufferTest.java   | 12 +--
 .../flink/streaming/io/CoRecordReaderTest.java  | 90 ++++++++++++++++++++
 12 files changed, 169 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index fd428c0..0f980e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -45,17 +45,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 		this.initialOffset = initialOffset;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void open(Configuration parameters) throws InterruptedException {
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		@SuppressWarnings("unchecked")
-		OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka");
-
-		if (lastKafkaOffSet.getState() == null) {
+		
+		if (context.containsState("kafka")) {
+			kafkaOffSet = (OperatorState<Long>) context.getState("kafka");
+		} else {
 			kafkaOffSet = new OperatorState<Long>(initialOffset);
 			context.registerState("kafka", kafkaOffSet);
-		} else {
-			kafkaOffSet = lastKafkaOffSet;
 		}
 
 		super.open(parameters);

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d813a30..ea19a44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig implements Serializable {
@@ -55,7 +54,6 @@ public class StreamConfig implements Serializable {
 	private static final String SERIALIZEDUDF = "serializedudf";
 	private static final String USER_FUNCTION = "userfunction";
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
-	private static final String OPERATOR_STATES = "operatorStates";
 	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
 	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
@@ -331,20 +329,6 @@ public class StreamConfig implements Serializable {
 		return config.getInteger(INPUT_TYPE + inputNumber, 0);
 	}
 
-	public void setOperatorStates(Map<String, OperatorState<?>> states) {
-		config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
-	}
-
-	@SuppressWarnings("unchecked")
-	public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
-		try {
-			return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
-					this.config, OPERATOR_STATES, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not load operator state");
-		}
-	}
-
 	public void setChainedOutputs(List<Integer> chainedOutputs) {
 		config.setBytes(CHAINED_OUTPUTS,
 				SerializationUtils.serialize((Serializable) chainedOutputs));

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 8334aa1..f0fbaab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.sling.commons.json.JSONArray;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
@@ -85,7 +84,6 @@ public class StreamGraph extends StreamingPlan {
 	private Map<Integer, Integer> iterationIDtoTailID;
 	private Map<Integer, Integer> iterationTailCount;
 	private Map<Integer, Long> iterationTimeouts;
-	private Map<Integer, Map<String, OperatorState<?>>> operatorStates;
 	private Map<Integer, InputFormat<String, ?>> inputFormatLists;
 	private List<Map<Integer, ?>> containingMaps;
 
@@ -149,10 +147,8 @@ public class StreamGraph extends StreamingPlan {
 		containingMaps.add(iterationTailCount);
 		iterationTimeouts = new HashMap<Integer, Long>();
 		containingMaps.add(iterationTailCount);
-		operatorStates = new HashMap<Integer, Map<String, OperatorState<?>>>();
-		containingMaps.add(operatorStates);
 		inputFormatLists = new HashMap<Integer, InputFormat<String, ?>>();
-		containingMaps.add(operatorStates);
+		containingMaps.add(inputFormatLists);
 		sources = new HashSet<Integer>();
 	}
 
@@ -419,22 +415,6 @@ public class StreamGraph extends StreamingPlan {
 		return this.bufferTimeouts.get(vertexID);
 	}
 
-	public void addOperatorState(Integer vertexName, String stateName, OperatorState<?> state) {
-		Map<String, OperatorState<?>> states = operatorStates.get(vertexName);
-		if (states == null) {
-			states = new HashMap<String, OperatorState<?>>();
-			states.put(stateName, state);
-		} else {
-			if (states.containsKey(stateName)) {
-				throw new RuntimeException("State has already been registered with this name: "
-						+ stateName);
-			} else {
-				states.put(stateName, state);
-			}
-		}
-		operatorStates.put(vertexName, states);
-	}
-
 	/**
 	 * Sets a user defined {@link OutputSelector} for the given operator. Used
 	 * for directed emits.
@@ -594,10 +574,6 @@ public class StreamGraph extends StreamingPlan {
 		return outputSelectors.get(vertexID);
 	}
 
-	public Map<String, OperatorState<?>> getState(Integer vertexID) {
-		return operatorStates.get(vertexID);
-	}
-
 	public Integer getIterationID(Integer vertexID) {
 		return iterationIds.get(vertexID);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index b50ac25..ecb6455 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -212,7 +212,6 @@ public class StreamingJobGraphGenerator {
 
 		config.setUserInvokable(streamGraph.getInvokable(vertexID));
 		config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
-		config.setOperatorStates(streamGraph.getState(vertexID));
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setOutputs(nonChainableOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b0fc364..16284d4 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,16 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -99,43 +93,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		return this;
 	}
 
-	/**
-	 * This is a beta feature </br></br> Register an operator state for this
-	 * operator by the given name. This name can be used to retrieve the state
-	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
-	 * obtain the {@link StreamingRuntimeContext} from the user-defined function
-	 * use the {@link RichFunction#getRuntimeContext()} method.
-	 * 
-	 * @param name
-	 *            The name of the operator state.
-	 * @param state
-	 *            The state to be registered for this name.
-	 * @return The data stream with state registered.
-	 */
-	public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
-		streamGraph.addOperatorState(getId(), name, state);
-		return this;
-	}
-
-	/**
-	 * This is a beta feature </br></br> Register operator states for this
-	 * operator provided in a map. The registered states can be retrieved during
-	 * runtime using {@link StreamingRuntimeContext#getState(String)}. To obtain
-	 * the {@link StreamingRuntimeContext} from the user-defined function use
-	 * the {@link RichFunction#getRuntimeContext()} method.
-	 * 
-	 * @param states
-	 *            The map containing the states that will be registered.
-	 * @return The data stream with states registered.
-	 */
-	public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
-		for (Entry<String, OperatorState<?>> entry : states.entrySet()) {
-			streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue());
-		}
-
-		return this;
-	}
-
 	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, O> broadcast() {
 		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 2b407c6..b036829 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,16 +84,14 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 				next = recordIterator.next(reuse1, reuse2);
 			} catch (IOException e) {
 				if (isRunning) {
-					throw new RuntimeException("Could not read next record due to: "
-							+ StringUtils.stringifyException(e));
+					throw new RuntimeException("Could not read next record.", e);
 				} else {
 					// Task already cancelled do nothing
 					next = 0;
 				}
 			} catch (IllegalStateException e) {
 				if (isRunning) {
-					throw new RuntimeException("Could not read next record due to: "
-							+ StringUtils.stringifyException(e));
+					throw new RuntimeException("Could not read next record.", e);
 				} else {
 					// Task already cancelled do nothing
 					next = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5ff47d6..eb0d6ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
@@ -64,8 +65,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	protected ClassLoader userClassLoader;
 
 	private EventListener<TaskEvent> superstepListener;
-	
-	private boolean onRecovery;
 
 	public StreamVertex() {
 		userInvokable = null;
@@ -89,57 +88,38 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	protected void initialize() {
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		if(!onRecovery)
-		{
-			this.states = configuration.getOperatorStates(userClassLoader);
-		}
+		this.states = new HashMap<String, OperatorState<?>>();
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
-	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
-		userInvokable.setRuntimeContext(context);
-		userInvokable.open(getTaskConfiguration());
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.setRuntimeContext(context);
-			invokable.open(getTaskConfiguration());
-		}
-
-		userInvokable.invoke();
-		userInvokable.close();
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.close();
-		}
-
-	}
-
 	@Override
 	public void broadcastBarrier(long id) {
-		//Only called at input vertices
+		// Only called at input vertices
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Received barrier from jobmanager: " + id);
 		}
 		actOnBarrier(id);
 	}
 
+	/**
+	 * This method is called to confirm that a barrier has been fully processed.
+	 * It sends an acknowledgment to the jobmanager. In the current version if
+	 * there is user state it also checkpoints the state to the jobmanager.
+	 */
 	@Override
 	public void confirmBarrier(long barrierID) {
-		
-		if(configuration.getStateMonitoring() && states != null)
-		{
+
+		if (configuration.getStateMonitoring() && !states.isEmpty()) {
 			getEnvironment().getJobManager().tell(
-					new StateBarrierAck(getEnvironment().getJobID(), 
-							getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
-							barrierID, states), ActorRef.noSender());
-		}
-		else
-		{
+					new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states),
+					ActorRef.noSender());
+		} else {
 			getEnvironment().getJobManager().tell(
 					new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
-							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());	
+							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
 		}
-		
+
 	}
 
 	public void setInputsOutputs() {
@@ -273,10 +253,17 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		return this.superstepListener;
 	}
 
+	/**
+	 * Method to be called when a barrier is received from all the input
+	 * channels. It should broadcast the barrier to the output operators,
+	 * checkpoint the state and send an ack.
+	 * 
+	 * @param id
+	 */
 	private void actOnBarrier(long id) {
 		try {
 			outputHandler.broadcastBarrier(id);
-			//TODO checkpoint state here
+			// TODO checkpoint state here
 			confirmBarrier(id);
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
@@ -293,12 +280,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
 	}
 
+	/**
+	 * Re-injects the user states into the map
+	 */
 	@Override
-	public void injectStates(Map<String,OperatorState<?>> states) {
-		onRecovery = true;
+	public void injectStates(Map<String, OperatorState<?>> states) {
 		this.states.putAll(states);
 	}
-	
 
 	private class SuperstepEventListener implements EventListener<TaskEvent> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 492d2a0..da083fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.streamvertex;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.configuration.Configuration;
@@ -64,19 +65,35 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 				throw new RuntimeException("No state has been registered for the name: " + name);
 			}
 		}
+	}
 
+	/**
+	 * Returns whether there is a state stored by the given name
+	 */
+	public boolean containsState(String name) {
+		return operatorStates.containsKey(name);
 	}
 
+	/**
+	 * This is a beta feature </br></br> Register an operator state for this
+	 * operator by the given name. This name can be used to retrieve the state
+	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
+	 * obtain the {@link StreamingRuntimeContext} from the user-defined function
+	 * use the {@link RichFunction#getRuntimeContext()} method.
+	 * 
+	 * @param name
+	 *            The name of the operator state.
+	 * @param state
+	 *            The state to be registered for this name.
+	 * @return The data stream with state registered.
+	 */
 	public void registerState(String name, OperatorState<?> state) {
 		if (state == null) {
 			throw new RuntimeException("Cannot register null state");
 		} else {
-			if(operatorStates.containsKey(name))
-			{
+			if (operatorStates.containsKey(name)) {
 				throw new RuntimeException("State is already registered");
-			}
-			else
-			{
+			} else {
 				operatorStates.put(name, state);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 7dfccb0..42d4919 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -30,6 +30,15 @@ import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Class encapsulating the functionality that is necessary to sync inputs on
+ * superstep barriers. Once a barrier is received from an input channel, whe
+ * should not process further buffers from that channel until we received the
+ * barrier from all other channels as well. To avoid back-pressuring the
+ * readers, we buffer up the new data received from the blocked channels until
+ * the blocks are released.
+ * 
+ */
 public class BarrierBuffer {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
@@ -54,7 +63,7 @@ public class BarrierBuffer {
 	}
 
 	/**
-	 * Starts the next superstep
+	 * Starts the next superstep in the buffer
 	 * 
 	 * @param superstep
 	 *            The next superstep

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 6c91f4d..c32db4e 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -62,8 +62,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	private boolean hasRequestedPartitions;
 
-	private CoBarrierBuffer barrierBuffer1;
-	private CoBarrierBuffer barrierBuffer2;
+	protected CoBarrierBuffer barrierBuffer1;
+	protected CoBarrierBuffer barrierBuffer2;
 
 	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
 		super(new UnionInputGate(inputgate1, inputgate2));
@@ -195,7 +195,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 		}
 	}
 
-	private int getNextReaderIndexBlocking() throws InterruptedException {
+	protected int getNextReaderIndexBlocking() throws InterruptedException {
 
 		Integer nextIndex = 0;
 
@@ -230,6 +230,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	@Override
 	public void onEvent(InputGate bufferReader) {
+		addToAvailable(bufferReader);
+	}
+	
+	protected void addToAvailable(InputGate bufferReader){
 		if (bufferReader == bufferReader1) {
 			availableRecordReaders.add(1);
 		} else if (bufferReader == bufferReader2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 203216b..1b4cc36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -44,8 +44,6 @@ public class BarrierBufferTest {
 		input.add(createBuffer(0));
 		input.add(createBuffer(0));
 		input.add(createBuffer(0));
-		input.add(createBuffer(2));
-		input.add(createBuffer(2));
 
 		InputGate mockIG = new MockInputGate(1, input);
 		AbstractReader mockAR = new MockReader(mockIG);
@@ -55,8 +53,6 @@ public class BarrierBufferTest {
 		assertEquals(input.get(0), bb.getNextNonBlocked());
 		assertEquals(input.get(1), bb.getNextNonBlocked());
 		assertEquals(input.get(2), bb.getNextNonBlocked());
-		assertEquals(input.get(3), bb.getNextNonBlocked());
-		assertEquals(input.get(4), bb.getNextNonBlocked());
 
 	}
 
@@ -136,7 +132,7 @@ public class BarrierBufferTest {
 
 	}
 
-	private static class MockInputGate implements InputGate {
+	protected static class MockInputGate implements InputGate {
 
 		private int numChannels;
 		private Queue<BufferOrEvent> boes;
@@ -175,7 +171,7 @@ public class BarrierBufferTest {
 
 	}
 
-	private static class MockReader extends AbstractReader {
+	protected static class MockReader extends AbstractReader {
 
 		protected MockReader(InputGate inputGate) {
 			super(inputGate);
@@ -183,11 +179,11 @@ public class BarrierBufferTest {
 
 	}
 
-	private static BufferOrEvent createSuperstep(long id, int channel) {
+	protected static BufferOrEvent createSuperstep(long id, int channel) {
 		return new BufferOrEvent(new StreamingSuperstep(id), channel);
 	}
 
-	private static BufferOrEvent createBuffer(int channel) {
+	protected static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
 				new BufferRecycler() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/490fa700/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
new file mode 100644
index 0000000..1e57d14
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate;
+import org.junit.Test;
+
+public class CoRecordReaderTest {
+
+	@Test
+	public void test() throws InterruptedException, IOException {
+
+		List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
+		input1.add(BarrierBufferTest.createBuffer(0));
+		input1.add(BarrierBufferTest.createSuperstep(1, 0));
+		input1.add(BarrierBufferTest.createBuffer(0));
+
+		InputGate ig1 = new MockInputGate(1, input1);
+
+		List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
+		input2.add(BarrierBufferTest.createBuffer(0));
+		input2.add(BarrierBufferTest.createBuffer(0));
+		input2.add(BarrierBufferTest.createSuperstep(1, 0));
+		input2.add(BarrierBufferTest.createBuffer(0));
+
+		InputGate ig2 = new MockInputGate(1, input2);
+
+		CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
+				ig1, ig2);
+		BarrierBuffer b1 = coReader.barrierBuffer1;
+		BarrierBuffer b2 = coReader.barrierBuffer2;
+
+		coReader.addToAvailable(ig1);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig1);
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+		b1.processSuperstep(input1.get(1));
+
+		coReader.addToAvailable(ig1);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig2);
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+		b2.processSuperstep(input2.get(2));
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+	}
+
+}