You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:06 UTC

[06/14] flink git commit: [FLINK-1638] [streaming] Barrier sync added to CoRecordReader, barrier tests

[FLINK-1638] [streaming] Barrier sync added to CoRecordReader, barrier tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5327d56d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5327d56d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5327d56d

Branch: refs/heads/master
Commit: 5327d56dc6f6f49a07054d89efcf30c894c85eca
Parents: c9a3992
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 22:04:49 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BarrierBuffer.java    | 143 -------------
 .../reader/StreamingAbstractRecordReader.java   | 122 -----------
 .../connectors/kafka/api/KafkaSource.java       |   5 +-
 .../api/invokable/operator/co/CoInvokable.java  |  11 +-
 .../flink/streaming/io/BarrierBuffer.java       | 155 ++++++++++++++
 .../flink/streaming/io/CoRecordReader.java      | 108 ++++++++--
 .../io/StreamingAbstractRecordReader.java       | 123 ++++++++++++
 .../io/StreamingMutableRecordReader.java        |   1 -
 .../streaming/state/PartitionableState.java     |   8 +-
 .../streaming/api/WindowCrossJoinTest.java      |   4 +-
 .../flink/streaming/io/BarrierBufferTest.java   | 200 +++++++++++++++++++
 11 files changed, 589 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
deleted file mode 100644
index ee317cd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BarrierBuffer {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
-	private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>();
-	private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>();
-
-	private Set<Integer> blockedChannels = new HashSet<Integer>();
-	private int totalNumberOfInputChannels;
-
-	private StreamingSuperstep currentSuperstep;
-	private boolean receivedSuperstep;
-
-	private boolean blockAll = false;
-
-	private AbstractReader reader;
-
-	private InputGate inputGate;
-
-	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
-		this.inputGate = inputGate;
-		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.reader = reader;
-	}
-
-	private void startSuperstep(StreamingSuperstep superstep) {
-		this.currentSuperstep = superstep;
-		this.receivedSuperstep = true;
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Superstep started with id: " + superstep.getId());
-		}
-	}
-
-	private void store(BufferOrEvent bufferOrEvent) {
-		bufferOrEvents.add(bufferOrEvent);
-	}
-
-	private BufferOrEvent getNonProcessed() {
-		return unprocessed.poll();
-	}
-
-	private boolean isBlocked(int channelIndex) {
-		return blockAll || blockedChannels.contains(channelIndex);
-	}
-	
-	private boolean containsNonprocessed() {
-		return !unprocessed.isEmpty();
-	}
-
-	private boolean receivedSuperstep() {
-		return receivedSuperstep;
-	}
-
-	public BufferOrEvent getNextNonBlocked() throws IOException,
-			InterruptedException {
-		BufferOrEvent bufferOrEvent = null;
-
-		if (containsNonprocessed()) {
-			bufferOrEvent = getNonProcessed();
-		} else {
-			while (bufferOrEvent == null) {
-				BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent();
-				if (isBlocked(nextBufferOrEvent.getChannelIndex())) {
-					store(nextBufferOrEvent);
-				} else {
-					bufferOrEvent = nextBufferOrEvent;
-				}
-			}
-		}
-		return bufferOrEvent;
-	}
-
-	private void blockChannel(int channelIndex) {
-		if (!blockedChannels.contains(channelIndex)) {
-			blockedChannels.add(channelIndex);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Channel blocked with index: " + channelIndex);
-			}
-			if (blockedChannels.size() == totalNumberOfInputChannels) {
-				reader.publish(currentSuperstep);
-				unprocessed.addAll(bufferOrEvents);
-				bufferOrEvents.clear();
-				blockedChannels.clear();
-				receivedSuperstep = false;
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("All barriers received, blocks released");
-				}
-			}
-
-		} else {
-			throw new RuntimeException("Tried to block an already blocked channel");
-		}
-	}
-
-	public String toString() {
-		return blockedChannels.toString();
-	}
-
-	public void processSuperstep(BufferOrEvent bufferOrEvent) {
-		int channelIndex = bufferOrEvent.getChannelIndex();
-		if (isBlocked(channelIndex)) {
-			store(bufferOrEvent);
-		} else {
-			StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
-			if (!receivedSuperstep()) {
-				startSuperstep(superstep);
-			}
-			blockChannel(channelIndex);
-		}
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
deleted file mode 100644
index ea2d7a6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
-		ReaderBase {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
-	private final RecordDeserializer<T>[] recordDeserializers;
-
-	private RecordDeserializer<T> currentRecordDeserializer;
-
-	private boolean isFinished;
-
-	private final BarrierBuffer barrierBuffer;
-
-	protected StreamingAbstractRecordReader(InputGate inputGate) {
-		super(inputGate);
-		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
-		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
-		}
-	}
-
-	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-		if (isFinished) {
-			return false;
-		}
-
-		while (true) {
-			if (currentRecordDeserializer != null) {
-				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
-				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
-					currentRecordDeserializer = null;
-				}
-
-				if (result.isFullRecord()) {
-					return true;
-				}
-			}
-
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof StreamingSuperstep) {
-					barrierBuffer.processSuperstep(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
-				}
-			}
-		}
-	}
-
-	public void clearBuffers() {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4349081..0c6cd4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -78,8 +78,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	}
 
 	public KafkaSource(String zookeeperHost, String topicId,
-					   	DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){
-		this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+			DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+		this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema,
+				ZOOKEEPER_DEFAULT_SYNC_TIME);
 	}
 
 	public KafkaSource(String zookeeperHost, String topicId,

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index b41dbbb..2b407c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,7 +84,16 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 				next = recordIterator.next(reuse1, reuse2);
 			} catch (IOException e) {
 				if (isRunning) {
-					throw e;
+					throw new RuntimeException("Could not read next record due to: "
+							+ StringUtils.stringifyException(e));
+				} else {
+					// Task already cancelled do nothing
+					next = 0;
+				}
+			} catch (IllegalStateException e) {
+				if (isRunning) {
+					throw new RuntimeException("Could not read next record due to: "
+							+ StringUtils.stringifyException(e));
 				} else {
 					// Task already cancelled do nothing
 					next = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
new file mode 100644
index 0000000..3ff718a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BarrierBuffer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+	private Queue<BufferOrEvent> nonprocessed = new LinkedList<BufferOrEvent>();
+	private Queue<BufferOrEvent> blockedNonprocessed = new LinkedList<BufferOrEvent>();
+
+	private Set<Integer> blockedChannels = new HashSet<Integer>();
+	private int totalNumberOfInputChannels;
+
+	private StreamingSuperstep currentSuperstep;
+	private boolean superstepStarted;
+
+	private AbstractReader reader;
+
+	private InputGate inputGate;
+
+	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+		this.inputGate = inputGate;
+		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.reader = reader;
+	}
+
+	protected void startSuperstep(StreamingSuperstep superstep) {
+		this.currentSuperstep = superstep;
+		this.superstepStarted = true;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Superstep started with id: " + superstep.getId());
+		}
+	}
+
+	protected void store(BufferOrEvent bufferOrEvent) {
+		nonprocessed.add(bufferOrEvent);
+	}
+
+	protected BufferOrEvent getNonProcessed() {
+		BufferOrEvent nextNonprocessed = null;
+		while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
+			nextNonprocessed = nonprocessed.poll();
+			if (isBlocked(nextNonprocessed.getChannelIndex())) {
+				blockedNonprocessed.add(nextNonprocessed);
+				nextNonprocessed = null;
+			}
+		}
+		return nextNonprocessed;
+	}
+
+	protected boolean isBlocked(int channelIndex) {
+		return blockedChannels.contains(channelIndex);
+	}
+
+	protected boolean isAllBlocked() {
+		return blockedChannels.size() == totalNumberOfInputChannels;
+	}
+
+	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+		// If there are non-processed buffers from the previously blocked ones,
+		// we get the next
+		BufferOrEvent bufferOrEvent = getNonProcessed();
+
+		if (bufferOrEvent != null) {
+			return bufferOrEvent;
+		} else {
+			// If no non-processed, get new from input
+			while (true) {
+				// We read the next buffer from the inputgate
+				bufferOrEvent = inputGate.getNextBufferOrEvent();
+				if (isBlocked(bufferOrEvent.getChannelIndex())) {
+					// If channel blocked we just store it
+					store(bufferOrEvent);
+				} else {
+					return bufferOrEvent;
+				}
+			}
+		}
+	}
+
+	protected void blockChannel(int channelIndex) {
+		if (!blockedChannels.contains(channelIndex)) {
+			blockedChannels.add(channelIndex);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Channel blocked with index: " + channelIndex);
+			}
+			if (isAllBlocked()) {
+				actOnAllBlocked();
+			}
+
+		} else {
+			throw new RuntimeException("Tried to block an already blocked channel");
+		}
+	}
+
+	protected void releaseBlocks() {
+		nonprocessed.addAll(blockedNonprocessed);
+		blockedChannels.clear();
+		blockedNonprocessed.clear();
+		superstepStarted = false;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("All barriers received, blocks released");
+		}
+	}
+
+	protected void actOnAllBlocked() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Publishing barrier to the vertex");
+		}
+		reader.publish(currentSuperstep);
+		releaseBlocks();
+	}
+
+	public String toString() {
+		return blockedChannels.toString();
+	}
+
+	public void processSuperstep(BufferOrEvent bufferOrEvent) {
+		StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
+		if (!superstepStarted) {
+			startSuperstep(superstep);
+		}
+		blockChannel(bufferOrEvent.getChannelIndex());
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 79f09c4..6a1f624 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -18,10 +18,12 @@
 package org.apache.flink.streaming.io;
 
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -44,7 +46,9 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	private final InputGate bufferReader2;
 
-	private final BlockingQueue<Integer> availableRecordReaders = new LinkedBlockingQueue<Integer>();
+	private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
+
+	private LinkedList<Integer> processed = new LinkedList<Integer>();
 
 	private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
 
@@ -59,15 +63,20 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	private boolean hasRequestedPartitions;
 
-	public CoRecordReader(InputGate bufferReader1, InputGate bufferReader2) {
-		super(new UnionInputGate(bufferReader1, bufferReader2));
+	private CoBarrierBuffer barrierBuffer1;
+	private CoBarrierBuffer barrierBuffer2;
+
+	private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
+
+	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
+		super(new UnionInputGate(inputgate1, inputgate2));
 
-		this.bufferReader1 = bufferReader1;
-		this.bufferReader2 = bufferReader2;
+		this.bufferReader1 = inputgate1;
+		this.bufferReader2 = inputgate2;
 
-		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1
+		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
 				.getNumberOfInputChannels()];
-		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2
+		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
 				.getNumberOfInputChannels()];
 
 		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
@@ -78,8 +87,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 			reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
 		}
 
-		bufferReader1.registerListener(this);
-		bufferReader2.registerListener(this);
+		inputgate1.registerListener(this);
+		inputgate2.registerListener(this);
+
+		barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
+		barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
+
+		barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
+		barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
 	}
 
 	public void requestPartitionsOnce() throws IOException, InterruptedException {
@@ -94,15 +109,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	@SuppressWarnings("unchecked")
 	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
 
-		requestPartitionsOnce();
+		requestPartitionsOnce();	
 
 		while (true) {
 			if (currentReaderIndex == 0) {
 				if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
 					return 0;
 				}
-
+				
 				currentReaderIndex = getNextReaderIndexBlocking();
+
 			}
 
 			if (currentReaderIndex == 1) {
@@ -123,12 +139,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 						}
 					} else {
 
-						final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent();
+						final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
 
 						if (boe.isBuffer()) {
 							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
 									.getChannelIndex()];
 							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						} else if (boe.getEvent() instanceof StreamingSuperstep) {
+							barrierBuffer1.processSuperstep(boe);
+							currentReaderIndex = 0;
+
+							break;
 						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
@@ -153,12 +174,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 							return 2;
 						}
 					} else {
-						final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent();
+						final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
 
 						if (boe.isBuffer()) {
 							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
 									.getChannelIndex()];
 							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						} else if (boe.getEvent() instanceof StreamingSuperstep) {
+							barrierBuffer2.processSuperstep(boe);
+							currentReaderIndex = 0;
+
+							break;
 						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
@@ -173,7 +199,32 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	}
 
 	private int getNextReaderIndexBlocking() throws InterruptedException {
-		return availableRecordReaders.take();
+
+		Integer nextIndex = 0;
+
+		while (processed.contains(nextIndex = availableRecordReaders.take())) {
+			processed.remove(nextIndex);
+		}
+
+		if (nextIndex == 1) {
+			if (barrierBuffer1.isAllBlocked()) {
+				availableRecordReaders.addFirst(1);
+				processed.add(2);
+				return 2;
+			} else {
+				return 1;
+			}
+		} else {
+			if (barrierBuffer2.isAllBlocked()) {
+				availableRecordReaders.addFirst(2);
+				processed.add(1);
+				return 1;
+			} else {
+				return 2;
+			}
+
+		}
+
 	}
 
 	// ------------------------------------------------------------------------
@@ -183,8 +234,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	@Override
 	public void onEvent(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
+			System.out.println("Added 1");
 			availableRecordReaders.add(1);
 		} else if (bufferReader == bufferReader2) {
+			System.out.println("Added 2");
 			availableRecordReaders.add(2);
 		}
 	}
@@ -203,4 +256,27 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 			}
 		}
 	}
+
+	private class CoBarrierBuffer extends BarrierBuffer {
+
+		private CoBarrierBuffer otherBuffer;
+
+		public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
+			super(inputGate, reader);
+		}
+
+		public void setOtherBarrierBuffer(CoBarrierBuffer other) {
+			this.otherBuffer = other;
+		}
+
+		@Override
+		protected void actOnAllBlocked() {
+			if (otherBuffer.isAllBlocked()) {
+				super.actOnAllBlocked();
+				otherBuffer.releaseBlocks();
+			}
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..811c48a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
+		ReaderBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+	private final RecordDeserializer<T>[] recordDeserializers;
+
+	private RecordDeserializer<T> currentRecordDeserializer;
+
+	private boolean isFinished;
+
+	private final BarrierBuffer barrierBuffer;
+
+	protected StreamingAbstractRecordReader(InputGate inputGate) {
+		super(inputGate);
+		barrierBuffer = new BarrierBuffer(inputGate, this);
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+				.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+		}
+	}
+
+	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					return true;
+				}
+			}
+
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+			if (bufferOrEvent.isBuffer()) {
+				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
+				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof StreamingSuperstep) {
+					barrierBuffer.processSuperstep(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
+				}
+			}
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
index ffa436b..e868879 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.io;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index a5e67ab..1c67c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -22,18 +22,18 @@ import org.apache.flink.runtime.state.OperatorState;
 /**
  * Base class for representing operator states that can be repartitioned for
  * state state and load balancing.
- *
+ * 
  * @param <T>
  *            The type of the operator state.
  */
 public abstract class PartitionableState<T> extends OperatorState<T> {
 
-	public PartitionableState(T initialState) {
+	private static final long serialVersionUID = 1L;
+
+	PartitionableState(T initialState) {
 		super(initialState);
 	}
 
-	private static final long serialVersionUID = 1L;
-
 	/**
 	 * Repartitions(divides) the current state into the given number of new
 	 * partitions. The created partitions will be used to redistribute then

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index e14e281..bd97917 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,7 +143,7 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
-		
+
 		@Override
 		public void cancel() {
 		}
@@ -157,7 +157,7 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
-		
+
 		@Override
 		public void cancel() {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
new file mode 100644
index 0000000..e7a03d9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferTest {
+
+	@Test
+	public void testWithoutBarriers() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(2));
+		input.add(createBuffer(2));
+
+		InputGate mockIG = new MockInputGate(1, input);
+		AbstractReader mockAR = new MockReader(mockIG);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+
+		assertEquals(input.get(0), bb.getNextNonBlocked());
+		assertEquals(input.get(1), bb.getNextNonBlocked());
+		assertEquals(input.get(2), bb.getNextNonBlocked());
+		assertEquals(input.get(3), bb.getNextNonBlocked());
+		assertEquals(input.get(4), bb.getNextNonBlocked());
+
+	}
+
+	@Test
+	public void testOneChannelBarrier() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(1, 0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(2, 0));
+		input.add(createBuffer(0));
+
+		InputGate mockIG = new MockInputGate(1, input);
+		AbstractReader mockAR = new MockReader(mockIG);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+		BufferOrEvent nextBoe;
+
+		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+
+	}
+
+	@Test
+	public void testMultiChannelBarrier() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(1, 0));
+		input.add(createSuperstep(2, 0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(3, 0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(1, 1));
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(2, 1));
+		input.add(createSuperstep(3, 1));
+
+		InputGate mockIG1 = new MockInputGate(2, input);
+		AbstractReader mockAR1 = new MockReader(mockIG1);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
+		BufferOrEvent nextBoe;
+
+		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked());
+
+	}
+
+	private static class MockInputGate implements InputGate {
+
+		private int numChannels;
+		private Queue<BufferOrEvent> boes;
+
+		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+			this.numChannels = numChannels;
+			this.boes = new LinkedList<BufferOrEvent>(boes);
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return boes.isEmpty();
+		}
+
+		@Override
+		public void requestPartitions() throws IOException, InterruptedException {
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			return boes.remove();
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {
+		}
+
+	}
+
+	private static class MockReader extends AbstractReader {
+
+		protected MockReader(InputGate inputGate) {
+			super(inputGate);
+		}
+
+	}
+
+	private static BufferOrEvent createSuperstep(long id, int channel) {
+		return new BufferOrEvent(new StreamingSuperstep(id), channel);
+	}
+
+	private static BufferOrEvent createBuffer(int channel) {
+		return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
+				new BufferRecycler() {
+
+					@Override
+					public void recycle(MemorySegment memorySegment) {
+					}
+				}), channel);
+	}
+
+}