You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:06 UTC
[06/14] flink git commit: [FLINK-1638] [streaming] Barrier sync added
to CoRecordReader, barrier tests
[FLINK-1638] [streaming] Barrier sync added to CoRecordReader, barrier tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5327d56d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5327d56d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5327d56d
Branch: refs/heads/master
Commit: 5327d56dc6f6f49a07054d89efcf30c894c85eca
Parents: c9a3992
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 22:04:49 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../io/network/api/reader/BarrierBuffer.java | 143 -------------
.../reader/StreamingAbstractRecordReader.java | 122 -----------
.../connectors/kafka/api/KafkaSource.java | 5 +-
.../api/invokable/operator/co/CoInvokable.java | 11 +-
.../flink/streaming/io/BarrierBuffer.java | 155 ++++++++++++++
.../flink/streaming/io/CoRecordReader.java | 108 ++++++++--
.../io/StreamingAbstractRecordReader.java | 123 ++++++++++++
.../io/StreamingMutableRecordReader.java | 1 -
.../streaming/state/PartitionableState.java | 8 +-
.../streaming/api/WindowCrossJoinTest.java | 4 +-
.../flink/streaming/io/BarrierBufferTest.java | 200 +++++++++++++++++++
11 files changed, 589 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
deleted file mode 100644
index ee317cd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BarrierBuffer {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
- private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>();
- private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>();
-
- private Set<Integer> blockedChannels = new HashSet<Integer>();
- private int totalNumberOfInputChannels;
-
- private StreamingSuperstep currentSuperstep;
- private boolean receivedSuperstep;
-
- private boolean blockAll = false;
-
- private AbstractReader reader;
-
- private InputGate inputGate;
-
- public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
- this.inputGate = inputGate;
- totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.reader = reader;
- }
-
- private void startSuperstep(StreamingSuperstep superstep) {
- this.currentSuperstep = superstep;
- this.receivedSuperstep = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep started with id: " + superstep.getId());
- }
- }
-
- private void store(BufferOrEvent bufferOrEvent) {
- bufferOrEvents.add(bufferOrEvent);
- }
-
- private BufferOrEvent getNonProcessed() {
- return unprocessed.poll();
- }
-
- private boolean isBlocked(int channelIndex) {
- return blockAll || blockedChannels.contains(channelIndex);
- }
-
- private boolean containsNonprocessed() {
- return !unprocessed.isEmpty();
- }
-
- private boolean receivedSuperstep() {
- return receivedSuperstep;
- }
-
- public BufferOrEvent getNextNonBlocked() throws IOException,
- InterruptedException {
- BufferOrEvent bufferOrEvent = null;
-
- if (containsNonprocessed()) {
- bufferOrEvent = getNonProcessed();
- } else {
- while (bufferOrEvent == null) {
- BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent();
- if (isBlocked(nextBufferOrEvent.getChannelIndex())) {
- store(nextBufferOrEvent);
- } else {
- bufferOrEvent = nextBufferOrEvent;
- }
- }
- }
- return bufferOrEvent;
- }
-
- private void blockChannel(int channelIndex) {
- if (!blockedChannels.contains(channelIndex)) {
- blockedChannels.add(channelIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Channel blocked with index: " + channelIndex);
- }
- if (blockedChannels.size() == totalNumberOfInputChannels) {
- reader.publish(currentSuperstep);
- unprocessed.addAll(bufferOrEvents);
- bufferOrEvents.clear();
- blockedChannels.clear();
- receivedSuperstep = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("All barriers received, blocks released");
- }
- }
-
- } else {
- throw new RuntimeException("Tried to block an already blocked channel");
- }
- }
-
- public String toString() {
- return blockedChannels.toString();
- }
-
- public void processSuperstep(BufferOrEvent bufferOrEvent) {
- int channelIndex = bufferOrEvent.getChannelIndex();
- if (isBlocked(channelIndex)) {
- store(bufferOrEvent);
- } else {
- StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
- if (!receivedSuperstep()) {
- startSuperstep(superstep);
- }
- blockChannel(channelIndex);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
deleted file mode 100644
index ea2d7a6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- *
- * @param <T>
- * The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
- ReaderBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
- private final RecordDeserializer<T>[] recordDeserializers;
-
- private RecordDeserializer<T> currentRecordDeserializer;
-
- private boolean isFinished;
-
- private final BarrierBuffer barrierBuffer;
-
- protected StreamingAbstractRecordReader(InputGate inputGate) {
- super(inputGate);
- barrierBuffer = new BarrierBuffer(inputGate, this);
-
- // Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
- for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
- }
- }
-
- protected boolean getNextRecord(T target) throws IOException, InterruptedException {
- if (isFinished) {
- return false;
- }
-
- while (true) {
- if (currentRecordDeserializer != null) {
- DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
- if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
- currentRecordDeserializer = null;
- }
-
- if (result.isFullRecord()) {
- return true;
- }
- }
-
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof StreamingSuperstep) {
- barrierBuffer.processSuperstep(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
- }
- }
- }
- }
-
- public void clearBuffers() {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4349081..0c6cd4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -78,8 +78,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
}
public KafkaSource(String zookeeperHost, String topicId,
- DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){
- this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+ DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+ this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema,
+ ZOOKEEPER_DEFAULT_SYNC_TIME);
}
public KafkaSource(String zookeeperHost, String topicId,
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index b41dbbb..2b407c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,7 +84,16 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
next = recordIterator.next(reuse1, reuse2);
} catch (IOException e) {
if (isRunning) {
- throw e;
+ throw new RuntimeException("Could not read next record due to: "
+ + StringUtils.stringifyException(e));
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ } catch (IllegalStateException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record due to: "
+ + StringUtils.stringifyException(e));
} else {
// Task already cancelled do nothing
next = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
new file mode 100644
index 0000000..3ff718a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BarrierBuffer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+ private Queue<BufferOrEvent> nonprocessed = new LinkedList<BufferOrEvent>();
+ private Queue<BufferOrEvent> blockedNonprocessed = new LinkedList<BufferOrEvent>();
+
+ private Set<Integer> blockedChannels = new HashSet<Integer>();
+ private int totalNumberOfInputChannels;
+
+ private StreamingSuperstep currentSuperstep;
+ private boolean superstepStarted;
+
+ private AbstractReader reader;
+
+ private InputGate inputGate;
+
+ public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+ this.inputGate = inputGate;
+ totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ this.reader = reader;
+ }
+
+ protected void startSuperstep(StreamingSuperstep superstep) {
+ this.currentSuperstep = superstep;
+ this.superstepStarted = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Superstep started with id: " + superstep.getId());
+ }
+ }
+
+ protected void store(BufferOrEvent bufferOrEvent) {
+ nonprocessed.add(bufferOrEvent);
+ }
+
+ protected BufferOrEvent getNonProcessed() {
+ BufferOrEvent nextNonprocessed = null;
+ while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
+ nextNonprocessed = nonprocessed.poll();
+ if (isBlocked(nextNonprocessed.getChannelIndex())) {
+ blockedNonprocessed.add(nextNonprocessed);
+ nextNonprocessed = null;
+ }
+ }
+ return nextNonprocessed;
+ }
+
+ protected boolean isBlocked(int channelIndex) {
+ return blockedChannels.contains(channelIndex);
+ }
+
+ protected boolean isAllBlocked() {
+ return blockedChannels.size() == totalNumberOfInputChannels;
+ }
+
+ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ // If there are non-processed buffers from the previously blocked ones,
+ // we get the next
+ BufferOrEvent bufferOrEvent = getNonProcessed();
+
+ if (bufferOrEvent != null) {
+ return bufferOrEvent;
+ } else {
+ // If no non-processed, get new from input
+ while (true) {
+ // We read the next buffer from the inputgate
+ bufferOrEvent = inputGate.getNextBufferOrEvent();
+ if (isBlocked(bufferOrEvent.getChannelIndex())) {
+ // If channel blocked we just store it
+ store(bufferOrEvent);
+ } else {
+ return bufferOrEvent;
+ }
+ }
+ }
+ }
+
+ protected void blockChannel(int channelIndex) {
+ if (!blockedChannels.contains(channelIndex)) {
+ blockedChannels.add(channelIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Channel blocked with index: " + channelIndex);
+ }
+ if (isAllBlocked()) {
+ actOnAllBlocked();
+ }
+
+ } else {
+ throw new RuntimeException("Tried to block an already blocked channel");
+ }
+ }
+
+ protected void releaseBlocks() {
+ nonprocessed.addAll(blockedNonprocessed);
+ blockedChannels.clear();
+ blockedNonprocessed.clear();
+ superstepStarted = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All barriers received, blocks released");
+ }
+ }
+
+ protected void actOnAllBlocked() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing barrier to the vertex");
+ }
+ reader.publish(currentSuperstep);
+ releaseBlocks();
+ }
+
+ public String toString() {
+ return blockedChannels.toString();
+ }
+
+ public void processSuperstep(BufferOrEvent bufferOrEvent) {
+ StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
+ if (!superstepStarted) {
+ startSuperstep(superstep);
+ }
+ blockChannel(bufferOrEvent.getChannelIndex());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 79f09c4..6a1f624 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -18,10 +18,12 @@
package org.apache.flink.streaming.io;
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -44,7 +46,9 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
private final InputGate bufferReader2;
- private final BlockingQueue<Integer> availableRecordReaders = new LinkedBlockingQueue<Integer>();
+ private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
+
+ private LinkedList<Integer> processed = new LinkedList<Integer>();
private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
@@ -59,15 +63,20 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
private boolean hasRequestedPartitions;
- public CoRecordReader(InputGate bufferReader1, InputGate bufferReader2) {
- super(new UnionInputGate(bufferReader1, bufferReader2));
+ private CoBarrierBuffer barrierBuffer1;
+ private CoBarrierBuffer barrierBuffer2;
+
+ private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
+
+ public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
+ super(new UnionInputGate(inputgate1, inputgate2));
- this.bufferReader1 = bufferReader1;
- this.bufferReader2 = bufferReader2;
+ this.bufferReader1 = inputgate1;
+ this.bufferReader2 = inputgate2;
- this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1
+ this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
.getNumberOfInputChannels()];
- this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2
+ this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
.getNumberOfInputChannels()];
for (int i = 0; i < reader1RecordDeserializers.length; i++) {
@@ -78,8 +87,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
}
- bufferReader1.registerListener(this);
- bufferReader2.registerListener(this);
+ inputgate1.registerListener(this);
+ inputgate2.registerListener(this);
+
+ barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
+ barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
+
+ barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
+ barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
}
public void requestPartitionsOnce() throws IOException, InterruptedException {
@@ -94,15 +109,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
@SuppressWarnings("unchecked")
protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
- requestPartitionsOnce();
+ requestPartitionsOnce();
while (true) {
if (currentReaderIndex == 0) {
if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
return 0;
}
-
+
currentReaderIndex = getNextReaderIndexBlocking();
+
}
if (currentReaderIndex == 1) {
@@ -123,12 +139,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
}
} else {
- final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent();
+ final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
if (boe.isBuffer()) {
reader1currentRecordDeserializer = reader1RecordDeserializers[boe
.getChannelIndex()];
reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+ } else if (boe.getEvent() instanceof StreamingSuperstep) {
+ barrierBuffer1.processSuperstep(boe);
+ currentReaderIndex = 0;
+
+ break;
} else if (handleEvent(boe.getEvent())) {
currentReaderIndex = 0;
@@ -153,12 +174,17 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
return 2;
}
} else {
- final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent();
+ final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
if (boe.isBuffer()) {
reader2currentRecordDeserializer = reader2RecordDeserializers[boe
.getChannelIndex()];
reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+ } else if (boe.getEvent() instanceof StreamingSuperstep) {
+ barrierBuffer2.processSuperstep(boe);
+ currentReaderIndex = 0;
+
+ break;
} else if (handleEvent(boe.getEvent())) {
currentReaderIndex = 0;
@@ -173,7 +199,32 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
}
private int getNextReaderIndexBlocking() throws InterruptedException {
- return availableRecordReaders.take();
+
+ Integer nextIndex = 0;
+
+ while (processed.contains(nextIndex = availableRecordReaders.take())) {
+ processed.remove(nextIndex);
+ }
+
+ if (nextIndex == 1) {
+ if (barrierBuffer1.isAllBlocked()) {
+ availableRecordReaders.addFirst(1);
+ processed.add(2);
+ return 2;
+ } else {
+ return 1;
+ }
+ } else {
+ if (barrierBuffer2.isAllBlocked()) {
+ availableRecordReaders.addFirst(2);
+ processed.add(1);
+ return 1;
+ } else {
+ return 2;
+ }
+
+ }
+
}
// ------------------------------------------------------------------------
@@ -183,8 +234,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
@Override
public void onEvent(InputGate bufferReader) {
if (bufferReader == bufferReader1) {
+ System.out.println("Added 1");
availableRecordReaders.add(1);
} else if (bufferReader == bufferReader2) {
+ System.out.println("Added 2");
availableRecordReaders.add(2);
}
}
@@ -203,4 +256,27 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
}
}
}
+
+ private class CoBarrierBuffer extends BarrierBuffer {
+
+ private CoBarrierBuffer otherBuffer;
+
+ public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
+ super(inputGate, reader);
+ }
+
+ public void setOtherBarrierBuffer(CoBarrierBuffer other) {
+ this.otherBuffer = other;
+ }
+
+ @Override
+ protected void actOnAllBlocked() {
+ if (otherBuffer.isAllBlocked()) {
+ super.actOnAllBlocked();
+ otherBuffer.releaseBlocks();
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..811c48a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ *
+ * @param <T>
+ * The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
+ ReaderBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+ private final RecordDeserializer<T>[] recordDeserializers;
+
+ private RecordDeserializer<T> currentRecordDeserializer;
+
+ private boolean isFinished;
+
+ private final BarrierBuffer barrierBuffer;
+
+ protected StreamingAbstractRecordReader(InputGate inputGate) {
+ super(inputGate);
+ barrierBuffer = new BarrierBuffer(inputGate, this);
+
+ // Initialize one deserializer per input channel
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+ .getNumberOfInputChannels()];
+ for (int i = 0; i < recordDeserializers.length; i++) {
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+ }
+ }
+
+ protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+ if (isFinished) {
+ return false;
+ }
+
+ while (true) {
+ if (currentRecordDeserializer != null) {
+ DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+ if (result.isBufferConsumed()) {
+ currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer = null;
+ }
+
+ if (result.isFullRecord()) {
+ return true;
+ }
+ }
+
+ final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+ if (bufferOrEvent.isBuffer()) {
+ currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+ } else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+
+ if (event instanceof StreamingSuperstep) {
+ barrierBuffer.processSuperstep(bufferOrEvent);
+ } else {
+ if (handleEvent(event)) {
+ if (inputGate.isFinished()) {
+ isFinished = true;
+ return false;
+ } else if (hasReachedEndOfSuperstep()) {
+ return false;
+ } // else: More data is coming...
+ }
+ }
+ }
+ }
+ }
+
+ public void clearBuffers() {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
index ffa436b..e868879 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.io;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index a5e67ab..1c67c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -22,18 +22,18 @@ import org.apache.flink.runtime.state.OperatorState;
/**
* Base class for representing operator states that can be repartitioned for
* state state and load balancing.
- *
+ *
* @param <T>
* The type of the operator state.
*/
public abstract class PartitionableState<T> extends OperatorState<T> {
- public PartitionableState(T initialState) {
+ private static final long serialVersionUID = 1L;
+
+ PartitionableState(T initialState) {
super(initialState);
}
- private static final long serialVersionUID = 1L;
-
/**
* Repartitions(divides) the current state into the given number of new
* partitions. The created partitions will be used to redistribute then
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index e14e281..bd97917 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,7 +143,7 @@ public class WindowCrossJoinTest implements Serializable {
public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
}
-
+
@Override
public void cancel() {
}
@@ -157,7 +157,7 @@ public class WindowCrossJoinTest implements Serializable {
public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
}
-
+
@Override
public void cancel() {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
new file mode 100644
index 0000000..e7a03d9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferTest {
+
+ @Test
+ public void testWithoutBarriers() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(2));
+ input.add(createBuffer(2));
+
+ InputGate mockIG = new MockInputGate(1, input);
+ AbstractReader mockAR = new MockReader(mockIG);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+
+ assertEquals(input.get(0), bb.getNextNonBlocked());
+ assertEquals(input.get(1), bb.getNextNonBlocked());
+ assertEquals(input.get(2), bb.getNextNonBlocked());
+ assertEquals(input.get(3), bb.getNextNonBlocked());
+ assertEquals(input.get(4), bb.getNextNonBlocked());
+
+ }
+
+ @Test
+ public void testOneChannelBarrier() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(1, 0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(2, 0));
+ input.add(createBuffer(0));
+
+ InputGate mockIG = new MockInputGate(1, input);
+ AbstractReader mockAR = new MockReader(mockIG);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+ BufferOrEvent nextBoe;
+
+ assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+
+ }
+
+ @Test
+ public void testMultiChannelBarrier() throws IOException, InterruptedException {
+
+ List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(1, 0));
+ input.add(createSuperstep(2, 0));
+ input.add(createBuffer(0));
+ input.add(createSuperstep(3, 0));
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(1, 1));
+ input.add(createBuffer(0));
+ input.add(createBuffer(1));
+ input.add(createSuperstep(2, 1));
+ input.add(createSuperstep(3, 1));
+
+ InputGate mockIG1 = new MockInputGate(2, input);
+ AbstractReader mockAR1 = new MockReader(mockIG1);
+
+ BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
+ BufferOrEvent nextBoe;
+
+ assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked());
+ bb.processSuperstep(nextBoe);
+ assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+ assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked());
+
+ }
+
+ private static class MockInputGate implements InputGate {
+
+ private int numChannels;
+ private Queue<BufferOrEvent> boes;
+
+ public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+ this.numChannels = numChannels;
+ this.boes = new LinkedList<BufferOrEvent>(boes);
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return boes.isEmpty();
+ }
+
+ @Override
+ public void requestPartitions() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ return boes.remove();
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) throws IOException {
+ }
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {
+ }
+
+ }
+
+ private static class MockReader extends AbstractReader {
+
+ protected MockReader(InputGate inputGate) {
+ super(inputGate);
+ }
+
+ }
+
+ private static BufferOrEvent createSuperstep(long id, int channel) {
+ return new BufferOrEvent(new StreamingSuperstep(id), channel);
+ }
+
+ private static BufferOrEvent createBuffer(int channel) {
+ return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
+ new BufferRecycler() {
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ }
+ }), channel);
+ }
+
+}