You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:50 UTC
[09/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
deleted file mode 100644
index 3c8824b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class IndexedMutableReader<T extends IOReadableWritable> extends
- StreamingMutableRecordReader<T> {
-
- InputGate reader;
-
- public IndexedMutableReader(InputGate reader) {
- super(reader);
- this.reader = reader;
- }
-
- public int getNumberOfInputChannels() {
- return reader.getNumberOfInputChannels();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
deleted file mode 100644
index 18cdd4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-
-public class IndexedReaderIterator<T> extends ReaderIterator<T> {
-
- public IndexedReaderIterator(
- IndexedMutableReader<DeserializationDelegate<T>> reader,
- TypeSerializer<T> serializer) {
-
- super(reader, serializer);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
deleted file mode 100644
index 3e6edb9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.util.Collection;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-public class InputGateFactory {
-
- public static InputGate createInputGate(Collection<InputGate> inputGates) {
- return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
- }
-
- public static InputGate createInputGate(InputGate[] inputGates) {
- if (inputGates.length <= 0) {
- throw new RuntimeException("No such input gate.");
- }
-
- if (inputGates.length < 2) {
- return inputGates[0];
- } else {
- return new UnionInputGate(inputGates);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
deleted file mode 100644
index e859225..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecordWriterFactory {
- private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class);
-
- public static <OUT extends IOReadableWritable> RecordWriter<OUT> createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector<OUT> channelSelector, long bufferTimeout) {
-
- RecordWriter<OUT> output;
-
- if (bufferTimeout >= 0) {
- output = new StreamRecordWriter<OUT>(bufferWriter, channelSelector, bufferTimeout);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout);
- }
- } else {
- output = new RecordWriter<OUT>(bufferWriter, channelSelector);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("RecordWriter initiated.");
- }
- }
-
- return output;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
deleted file mode 100644
index ce16b8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-public class SpillReader {
-
- private FileChannel spillingChannel;
- private File spillFile;
-
- /**
- * Reads the next buffer from the spilled file.
- */
- public Buffer readNextBuffer(int bufferSize) throws IOException {
- try {
- Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
- new BufferRecycler() {
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- memorySegment.free();
- }
- });
-
- spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
-
- return buffer;
- } catch (Exception e) {
- close();
- throw new IOException(e);
- }
- }
-
- @SuppressWarnings("resource")
- public void setSpillFile(File nextSpillFile) throws IOException {
- // We can close and delete the file now
- close();
- if (spillFile != null) {
- spillFile.delete();
- }
- this.spillFile = nextSpillFile;
- this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
- }
-
- public File getSpillFile() {
- return spillFile;
- }
-
- public void close() throws IOException {
- if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
- this.spillingChannel.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
deleted file mode 100644
index ef3410e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-public class SpillingBufferOrEvent {
-
- private BufferOrEvent boe;
- private boolean isSpilled = false;
-
- private SpillReader spillReader;
-
- private int channelIndex;
- private int bufferSize;
-
- public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
- throws IOException {
-
- this.boe = boe;
- this.channelIndex = boe.getChannelIndex();
- this.spillReader = reader;
-
- if (boe.isBuffer()) {
- this.bufferSize = boe.getBuffer().getSize();
- spiller.spill(boe.getBuffer());
- this.boe = null;
- this.isSpilled = true;
- }
- }
-
- /**
- * If the buffer wasn't spilled simply returns the instance from the field,
- * otherwise reads it from the spill reader
- */
- public BufferOrEvent getBufferOrEvent() throws IOException {
- if (isSpilled) {
- boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
- this.isSpilled = false;
- return boe;
- } else {
- return boe;
- }
- }
-
- public boolean isSpilled() {
- return isSpilled;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
deleted file mode 100644
index a1fdc09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-
-import java.io.IOException;
-
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
- private long timeout;
-
- private OutputFlusher outputFlusher;
-
- public StreamRecordWriter(ResultPartitionWriter writer) {
- this(writer, new RoundRobinChannelSelector<T>(), 1000);
- }
-
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
- this(writer, channelSelector, 1000);
- }
-
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
- long timeout) {
- super(writer, channelSelector);
-
- this.timeout = timeout;
- this.outputFlusher = new OutputFlusher();
-
- outputFlusher.start();
- }
-
- public void close() {
- try {
- if (outputFlusher != null) {
- outputFlusher.terminate();
- outputFlusher.join();
- }
-
- flush();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- // Do nothing here
- }
- }
-
- private class OutputFlusher extends Thread {
- private volatile boolean running = true;
-
- public void terminate() {
- running = false;
- }
-
- @Override
- public void run() {
- while (running) {
- try {
- flush();
- Thread.sleep(timeout);
- } catch (InterruptedException e) {
- // Do nothing here
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
deleted file mode 100644
index 8e939c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- *
- * @param <T>
- * The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
- AbstractReader implements ReaderBase, StreamingReader {
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
- private final RecordDeserializer<T>[] recordDeserializers;
-
- private RecordDeserializer<T> currentRecordDeserializer;
-
- private boolean isFinished;
-
- private final BarrierBuffer barrierBuffer;
-
- @SuppressWarnings("unchecked")
- protected StreamingAbstractRecordReader(InputGate inputGate) {
- super(inputGate);
- barrierBuffer = new BarrierBuffer(inputGate, this);
-
- // Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
- for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
- }
- }
-
- protected boolean getNextRecord(T target) throws IOException, InterruptedException {
- if (isFinished) {
- return false;
- }
-
- while (true) {
- if (currentRecordDeserializer != null) {
- DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
- if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
- currentRecordDeserializer = null;
- }
-
- if (result.isFullRecord()) {
- return true;
- }
- }
-
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof StreamingSuperstep) {
- barrierBuffer.processSuperstep(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException(
- "BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
- }
- }
- }
- }
-
- public void clearBuffers() {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- }
-
- public void cleanup() throws IOException {
- barrierBuffer.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
deleted file mode 100644
index e868879..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
- StreamingAbstractRecordReader<T> implements MutableReader<T> {
-
- public StreamingMutableRecordReader(InputGate inputGate) {
- super(inputGate);
- }
-
- @Override
- public boolean next(final T target) throws IOException, InterruptedException {
- return getNextRecord(target);
- }
-
- @Override
- public void clearBuffers() {
- super.clearBuffers();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
deleted file mode 100644
index 74b986a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-public interface StreamingReader {
-
- public void cleanup() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
deleted file mode 100644
index 3813c8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects all the output channels.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- int[] returnArray;
- boolean set;
- int setNumber;
-
- public BroadcastPartitioner() {
- super(PartitioningStrategy.BROADCAST);
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- if (set && setNumber == numberOfOutputChannels) {
- return returnArray;
- } else {
- this.returnArray = new int[numberOfOutputChannels];
- for (int i = 0; i < numberOfOutputChannels; i++) {
- returnArray[i] = i;
- }
- set = true;
- setNumber = numberOfOutputChannels;
- return returnArray;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
deleted file mode 100644
index 7299397..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class DistributePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[] {-1};
-
- public DistributePartitioner(boolean forward) {
- super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
- return this.returnArray;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
deleted file mode 100644
index 6bc036e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the same (one) channel for two Tuples having a
- * specified fields equal.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class FieldsPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[1];;
- KeySelector<T, ?> keySelector;
-
- public FieldsPartitioner(KeySelector<T, ?> keySelector) {
- super(PartitioningStrategy.GROUPBY);
- this.keySelector = keySelector;
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode()
- % numberOfOutputChannels);
-
- return returnArray;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
deleted file mode 100644
index c73ca71..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-//Group to the partitioner with the lowest id
-public class GlobalPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[] { 0 };
-
- public GlobalPartitioner() {
- super(PartitioningStrategy.GLOBAL);
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- return returnArray;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
deleted file mode 100644
index 318de3f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import java.util.Random;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by selecting one output channel
- * randomly.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class ShufflePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private Random random = new Random();
-
- private int[] returnArray = new int[1];
-
- public ShufflePartitioner() {
- super(PartitioningStrategy.SHUFFLE);
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- returnArray[0] = random.nextInt(numberOfOutputChannels);
- return returnArray;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
deleted file mode 100644
index 19a8dba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.partitioner;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public abstract class StreamPartitioner<T> implements
- ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
-
- public enum PartitioningStrategy {
-
- FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
-
- }
-
- private static final long serialVersionUID = 1L;
- private PartitioningStrategy strategy;
-
- public StreamPartitioner(PartitioningStrategy strategy) {
- this.strategy = strategy;
- }
-
- public PartitioningStrategy getStrategy() {
- return strategy;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
new file mode 100644
index 0000000..bc153f9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class encapsulating the functionality that is necessary to sync inputs on
+ * superstep barriers. Once a barrier is received from an input channel, whe
+ * should not process further buffers from that channel until we received the
+ * barrier from all other channels as well. To avoid back-pressuring the
+ * readers, we buffer up the new data received from the blocked channels until
+ * the blocks are released.
+ *
+ */
+public class BarrierBuffer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+ private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
+ private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+ private Set<Integer> blockedChannels = new HashSet<Integer>();
+ private int totalNumberOfInputChannels;
+
+ private StreamingSuperstep currentSuperstep;
+ private boolean superstepStarted;
+
+ private AbstractReader reader;
+
+ private InputGate inputGate;
+
+ private SpillReader spillReader;
+ private BufferSpiller bufferSpiller;
+
+ private boolean inputFinished = false;
+
+ private BufferOrEvent endOfStreamEvent = null;
+
+ public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+ this.inputGate = inputGate;
+ totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ this.reader = reader;
+ try {
+ this.bufferSpiller = new BufferSpiller();
+ this.spillReader = new SpillReader();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ /**
+ * Starts the next superstep in the buffer
+ *
+ * @param superstep
+ * The next superstep
+ */
+ protected void startSuperstep(StreamingSuperstep superstep) {
+ this.currentSuperstep = superstep;
+ this.superstepStarted = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Superstep started with id: " + superstep.getId());
+ }
+ }
+
+ /**
+ * Get then next non-blocked non-processed BufferOrEvent. Returns null if
+ * not available.
+ *
+ * @throws IOException
+ */
+ protected BufferOrEvent getNonProcessed() throws IOException {
+ SpillingBufferOrEvent nextNonprocessed;
+
+ while ((nextNonprocessed = nonprocessed.poll()) != null) {
+ BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
+ if (isBlocked(boe.getChannelIndex())) {
+ blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
+ } else {
+ return boe;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Checks whether a given channel index is blocked for this inputgate
+ *
+ * @param channelIndex
+ * The channel index to check
+ */
+ protected boolean isBlocked(int channelIndex) {
+ return blockedChannels.contains(channelIndex);
+ }
+
+ /**
+ * Checks whether all channels are blocked meaning that barriers are
+ * received from all channels
+ */
+ protected boolean isAllBlocked() {
+ return blockedChannels.size() == totalNumberOfInputChannels;
+ }
+
+ /**
+ * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
+ */
+ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ // If there are non-processed buffers from the previously blocked ones,
+ // we get the next
+ BufferOrEvent bufferOrEvent = getNonProcessed();
+
+ if (bufferOrEvent != null) {
+ return bufferOrEvent;
+ } else if (blockedNonprocessed.isEmpty() && inputFinished) {
+ return endOfStreamEvent;
+ } else {
+ // If no non-processed, get new from input
+ while (true) {
+ if (!inputFinished) {
+ // We read the next buffer from the inputgate
+ bufferOrEvent = inputGate.getNextBufferOrEvent();
+
+ if (!bufferOrEvent.isBuffer()
+ && bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
+ if (inputGate.isFinished()) {
+ // store the event for later if the channel is
+ // closed
+ endOfStreamEvent = bufferOrEvent;
+ inputFinished = true;
+ }
+
+ } else {
+ if (isBlocked(bufferOrEvent.getChannelIndex())) {
+ // If channel blocked we just store it
+ blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent,
+ bufferSpiller, spillReader));
+ } else {
+ return bufferOrEvent;
+ }
+ }
+ } else {
+ actOnAllBlocked();
+ return getNextNonBlocked();
+ }
+ }
+ }
+ }
+
+ /**
+ * Blocks the given channel index, from which a barrier has been received.
+ *
+ * @param channelIndex
+ * The channel index to block.
+ */
+ protected void blockChannel(int channelIndex) {
+ if (!blockedChannels.contains(channelIndex)) {
+ blockedChannels.add(channelIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Channel blocked with index: " + channelIndex);
+ }
+ if (isAllBlocked()) {
+ actOnAllBlocked();
+ }
+
+ } else {
+ throw new RuntimeException("Tried to block an already blocked channel");
+ }
+ }
+
+ /**
+ * Releases the blocks on all channels.
+ *
+ * @throws IOException
+ */
+ protected void releaseBlocks() {
+ if (!nonprocessed.isEmpty()) {
+ // sanity check
+ throw new RuntimeException("Error in barrier buffer logic");
+ }
+ nonprocessed = blockedNonprocessed;
+ blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+ try {
+ spillReader.setSpillFile(bufferSpiller.getSpillFile());
+ bufferSpiller.resetSpillFile();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ blockedChannels.clear();
+ superstepStarted = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All barriers received, blocks released");
+ }
+ }
+
+ /**
+ * Method that is executed once the barrier has been received from all
+ * channels.
+ */
+ protected void actOnAllBlocked() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing barrier to the vertex");
+ }
+
+ if (currentSuperstep != null) {
+ reader.publish(currentSuperstep);
+ }
+
+ releaseBlocks();
+ }
+
+ /**
+ * Processes a streaming superstep event
+ *
+ * @param bufferOrEvent
+ * The BufferOrEvent containing the event
+ */
+ public void processSuperstep(BufferOrEvent bufferOrEvent) {
+ StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
+ if (!superstepStarted) {
+ startSuperstep(superstep);
+ }
+ blockChannel(bufferOrEvent.getChannelIndex());
+ }
+
+ public void cleanup() throws IOException {
+ bufferSpiller.close();
+ File spillfile1 = bufferSpiller.getSpillFile();
+ if (spillfile1 != null) {
+ spillfile1.delete();
+ }
+
+ spillReader.close();
+ File spillfile2 = spillReader.getSpillFile();
+ if (spillfile2 != null) {
+ spillfile2.delete();
+ }
+ }
+
+ public String toString() {
+ return nonprocessed.toString() + blockedNonprocessed.toString();
+ }
+
+ public boolean isEmpty() {
+ return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
new file mode 100644
index 0000000..3905558
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+ /**
+ * Singleton instance
+ */
+ private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+
+ private BlockingQueueBroker() {
+ }
+
+ /**
+ * retrieve singleton instance
+ */
+ public static Broker<BlockingQueue<StreamRecord>> instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
new file mode 100644
index 0000000..0d57d05
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.StringUtils;
+
+public class BufferSpiller {
+
+ protected static Random rnd = new Random();
+
+ private File spillFile;
+ protected FileChannel spillingChannel;
+ private String tempDir;
+
+ public BufferSpiller() throws IOException {
+ String tempDirString = GlobalConfiguration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+ String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
+
+ tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
+
+ createSpillingChannel();
+ }
+
+ /**
+ * Dumps the contents of the buffer to disk and recycles the buffer.
+ */
+ public void spill(Buffer buffer) throws IOException {
+ try {
+ spillingChannel.write(buffer.getNioBuffer());
+ buffer.recycle();
+ } catch (IOException e) {
+ close();
+ throw new IOException(e);
+ }
+
+ }
+
+ @SuppressWarnings("resource")
+ private void createSpillingChannel() throws IOException {
+ this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+ this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+ }
+
+ private static String randomString(Random random) {
+ final byte[] bytes = new byte[20];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
+
+ public void close() throws IOException {
+ if (spillingChannel != null && spillingChannel.isOpen()) {
+ spillingChannel.close();
+ }
+ }
+
+ public void resetSpillFile() throws IOException {
+ close();
+ createSpillingChannel();
+ }
+
+ public File getSpillFile() {
+ return spillFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
new file mode 100644
index 0000000..4358810
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
+
+/**
+ * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
+ * input types.
+ */
+public class CoReaderIterator<T1, T2> {
+
+ private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
+ // source
+
+ protected final ReusingDeserializationDelegate<T1> delegate1;
+ protected final ReusingDeserializationDelegate<T2> delegate2;
+
+ public CoReaderIterator(
+ CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
+ TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
+ this.reader = reader;
+ this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
+ this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
+ }
+
+ public int next(T1 target1, T2 target2) throws IOException {
+ this.delegate1.setInstance(target1);
+ this.delegate2.setInstance(target2);
+
+ try {
+ return this.reader.getNextRecord(this.delegate1, this.delegate2);
+
+ } catch (InterruptedException e) {
+ throw new IOException("Reader interrupted.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
new file mode 100644
index 0000000..cfd27f7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+
+/**
+ * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
+ * types to read records effectively.
+ */
+@SuppressWarnings("rawtypes")
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
+ AbstractReader implements EventListener<InputGate>, StreamingReader {
+
+ private final InputGate bufferReader1;
+
+ private final InputGate bufferReader2;
+
+ private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
+
+ private LinkedList<Integer> processed = new LinkedList<Integer>();
+
+ private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
+
+ private RecordDeserializer<T1> reader1currentRecordDeserializer;
+
+ private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
+
+ private RecordDeserializer<T2> reader2currentRecordDeserializer;
+
+ // 0 => none, 1 => reader (T1), 2 => reader (T2)
+ private int currentReaderIndex;
+
+ private boolean hasRequestedPartitions;
+
+ protected CoBarrierBuffer barrierBuffer1;
+ protected CoBarrierBuffer barrierBuffer2;
+
+ public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
+ super(new UnionInputGate(inputgate1, inputgate2));
+
+ this.bufferReader1 = inputgate1;
+ this.bufferReader2 = inputgate2;
+
+ this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
+ .getNumberOfInputChannels()];
+ this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
+ .getNumberOfInputChannels()];
+
+ for (int i = 0; i < reader1RecordDeserializers.length; i++) {
+ reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
+ }
+
+ for (int i = 0; i < reader2RecordDeserializers.length; i++) {
+ reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
+ }
+
+ inputgate1.registerListener(this);
+ inputgate2.registerListener(this);
+
+ barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
+ barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
+
+ barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
+ barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
+ }
+
+ public void requestPartitionsOnce() throws IOException, InterruptedException {
+ if (!hasRequestedPartitions) {
+ bufferReader1.requestPartitions();
+ bufferReader2.requestPartitions();
+
+ hasRequestedPartitions = true;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
+
+ requestPartitionsOnce();
+
+ while (true) {
+ if (currentReaderIndex == 0) {
+ if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
+ return 0;
+ }
+
+ currentReaderIndex = getNextReaderIndexBlocking();
+
+ }
+
+ if (currentReaderIndex == 1) {
+ while (true) {
+ if (reader1currentRecordDeserializer != null) {
+ RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
+ .getNextRecord(target1);
+
+ if (result.isBufferConsumed()) {
+ reader1currentRecordDeserializer.getCurrentBuffer().recycle();
+ reader1currentRecordDeserializer = null;
+
+ currentReaderIndex = 0;
+ }
+
+ if (result.isFullRecord()) {
+ return 1;
+ }
+ } else {
+
+ final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
+
+ if (boe.isBuffer()) {
+ reader1currentRecordDeserializer = reader1RecordDeserializers[boe
+ .getChannelIndex()];
+ reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+ } else if (boe.getEvent() instanceof StreamingSuperstep) {
+ barrierBuffer1.processSuperstep(boe);
+ currentReaderIndex = 0;
+
+ break;
+ } else if (handleEvent(boe.getEvent())) {
+ currentReaderIndex = 0;
+
+ break;
+ }
+ }
+ }
+ } else if (currentReaderIndex == 2) {
+ while (true) {
+ if (reader2currentRecordDeserializer != null) {
+ RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
+ .getNextRecord(target2);
+
+ if (result.isBufferConsumed()) {
+ reader2currentRecordDeserializer.getCurrentBuffer().recycle();
+ reader2currentRecordDeserializer = null;
+
+ currentReaderIndex = 0;
+ }
+
+ if (result.isFullRecord()) {
+ return 2;
+ }
+ } else {
+ final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
+
+ if (boe.isBuffer()) {
+ reader2currentRecordDeserializer = reader2RecordDeserializers[boe
+ .getChannelIndex()];
+ reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+ } else if (boe.getEvent() instanceof StreamingSuperstep) {
+ barrierBuffer2.processSuperstep(boe);
+ currentReaderIndex = 0;
+
+ break;
+ } else if (handleEvent(boe.getEvent())) {
+ currentReaderIndex = 0;
+
+ break;
+ }
+ }
+ }
+ } else {
+ throw new IllegalStateException("Bug: unexpected current reader index.");
+ }
+ }
+ }
+
+ protected int getNextReaderIndexBlocking() throws InterruptedException {
+
+ Integer nextIndex = 0;
+
+ while (processed.contains(nextIndex = availableRecordReaders.take())) {
+ processed.remove(nextIndex);
+ }
+
+ if (nextIndex == 1) {
+ if (barrierBuffer1.isAllBlocked()) {
+ availableRecordReaders.addFirst(1);
+ processed.add(2);
+ return 2;
+ } else {
+ return 1;
+ }
+ } else {
+ if (barrierBuffer2.isAllBlocked()) {
+ availableRecordReaders.addFirst(2);
+ processed.add(1);
+ return 1;
+ } else {
+ return 2;
+ }
+
+ }
+
+ }
+
+ // ------------------------------------------------------------------------
+ // Data availability notifications
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void onEvent(InputGate bufferReader) {
+ addToAvailable(bufferReader);
+ }
+
+ protected void addToAvailable(InputGate bufferReader) {
+ if (bufferReader == bufferReader1) {
+ availableRecordReaders.add(1);
+ } else if (bufferReader == bufferReader2) {
+ availableRecordReaders.add(2);
+ }
+ }
+
+ public void clearBuffers() {
+ for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ }
+
+ private class CoBarrierBuffer extends BarrierBuffer {
+
+ private CoBarrierBuffer otherBuffer;
+
+ public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
+ super(inputGate, reader);
+ }
+
+ public void setOtherBarrierBuffer(CoBarrierBuffer other) {
+ this.otherBuffer = other;
+ }
+
+ @Override
+ protected void actOnAllBlocked() {
+ if (otherBuffer.isAllBlocked()) {
+ super.actOnAllBlocked();
+ otherBuffer.releaseBlocks();
+ }
+ }
+
+ }
+
+ public void cleanup() throws IOException {
+ try {
+ barrierBuffer1.cleanup();
+ } finally {
+ barrierBuffer2.cleanup();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
new file mode 100644
index 0000000..7f2a9c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class IndexedMutableReader<T extends IOReadableWritable> extends
+ StreamingMutableRecordReader<T> {
+
+ InputGate reader;
+
+ public IndexedMutableReader(InputGate reader) {
+ super(reader);
+ this.reader = reader;
+ }
+
+ public int getNumberOfInputChannels() {
+ return reader.getNumberOfInputChannels();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
new file mode 100644
index 0000000..2050e27
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+
+public class IndexedReaderIterator<T> extends ReaderIterator<T> {
+
+ public IndexedReaderIterator(
+ IndexedMutableReader<DeserializationDelegate<T>> reader,
+ TypeSerializer<T> serializer) {
+
+ super(reader, serializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
new file mode 100644
index 0000000..7883251
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.Collection;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+
+public class InputGateFactory {
+
+ public static InputGate createInputGate(Collection<InputGate> inputGates) {
+ return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
+ }
+
+ public static InputGate createInputGate(InputGate[] inputGates) {
+ if (inputGates.length <= 0) {
+ throw new RuntimeException("No such input gate.");
+ }
+
+ if (inputGates.length < 2) {
+ return inputGates[0];
+ } else {
+ return new UnionInputGate(inputGates);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
new file mode 100644
index 0000000..3a7ba3e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordWriterFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class);
+
+ public static <OUT extends IOReadableWritable> RecordWriter<OUT> createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector<OUT> channelSelector, long bufferTimeout) {
+
+ RecordWriter<OUT> output;
+
+ if (bufferTimeout >= 0) {
+ output = new StreamRecordWriter<OUT>(bufferWriter, channelSelector, bufferTimeout);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout);
+ }
+ } else {
+ output = new RecordWriter<OUT>(bufferWriter, channelSelector);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RecordWriter initiated.");
+ }
+ }
+
+ return output;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
new file mode 100644
index 0000000..356b491
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+public class SpillReader {
+
+ private FileChannel spillingChannel;
+ private File spillFile;
+
+ /**
+ * Reads the next buffer from the spilled file.
+ */
+ public Buffer readNextBuffer(int bufferSize) throws IOException {
+ try {
+ Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
+ new BufferRecycler() {
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ memorySegment.free();
+ }
+ });
+
+ spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
+
+ return buffer;
+ } catch (Exception e) {
+ close();
+ throw new IOException(e);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ public void setSpillFile(File nextSpillFile) throws IOException {
+ // We can close and delete the file now
+ close();
+ if (spillFile != null) {
+ spillFile.delete();
+ }
+ this.spillFile = nextSpillFile;
+ this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+ }
+
+ public File getSpillFile() {
+ return spillFile;
+ }
+
+ public void close() throws IOException {
+ if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
+ this.spillingChannel.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
new file mode 100644
index 0000000..368e373
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public class SpillingBufferOrEvent {
+
+ private BufferOrEvent boe;
+ private boolean isSpilled = false;
+
+ private SpillReader spillReader;
+
+ private int channelIndex;
+ private int bufferSize;
+
+ public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
+ throws IOException {
+
+ this.boe = boe;
+ this.channelIndex = boe.getChannelIndex();
+ this.spillReader = reader;
+
+ if (boe.isBuffer()) {
+ this.bufferSize = boe.getBuffer().getSize();
+ spiller.spill(boe.getBuffer());
+ this.boe = null;
+ this.isSpilled = true;
+ }
+ }
+
+ /**
+ * If the buffer wasn't spilled simply returns the instance from the field,
+ * otherwise reads it from the spill reader
+ */
+ public BufferOrEvent getBufferOrEvent() throws IOException {
+ if (isSpilled) {
+ boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
+ this.isSpilled = false;
+ return boe;
+ } else {
+ return boe;
+ }
+ }
+
+ public boolean isSpilled() {
+ return isSpilled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
new file mode 100644
index 0000000..eee30b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+
+import java.io.IOException;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+ private long timeout;
+
+ private OutputFlusher outputFlusher;
+
+ public StreamRecordWriter(ResultPartitionWriter writer) {
+ this(writer, new RoundRobinChannelSelector<T>(), 1000);
+ }
+
+ public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
+ this(writer, channelSelector, 1000);
+ }
+
+ public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+ long timeout) {
+ super(writer, channelSelector);
+
+ this.timeout = timeout;
+ this.outputFlusher = new OutputFlusher();
+
+ outputFlusher.start();
+ }
+
+ public void close() {
+ try {
+ if (outputFlusher != null) {
+ outputFlusher.terminate();
+ outputFlusher.join();
+ }
+
+ flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ // Do nothing here
+ }
+ }
+
+ private class OutputFlusher extends Thread {
+ private volatile boolean running = true;
+
+ public void terminate() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ flush();
+ Thread.sleep(timeout);
+ } catch (InterruptedException e) {
+ // Do nothing here
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}