You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:49 UTC

[33/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
deleted file mode 100644
index 2e415f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-/**
- * A {@code Window} is a grouping of elements into finite buckets. Windows have a maximum timestamp
- * which means that, at some point, all elements that go into one window will have arrived.
- *
- * <p>
- * Subclasses should implement {@code equals()} and {@code hashCode()} so that logically
- * same windows are treated the same.
- */
-public abstract class Window {
-
-	public abstract long maxTimestamp();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
deleted file mode 100644
index 863f7ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
- * all inputs have received the barrier for a given checkpoint.
- * 
- * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
- * the blocks are released.</p>
- */
-public class BarrierBuffer implements CheckpointBarrierHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-	
-	/** The gate that the buffer draws its input from */
-	private final InputGate inputGate;
-
-	/** Flags that indicate whether a channel is currently blocked/buffered */
-	private final boolean[] blockedChannels;
-	
-	/** The total number of channels that this buffer handles data from */
-	private final int totalNumberOfInputChannels;
-	
-	/** To utility to write blocked data to a file channel */
-	private final BufferSpiller bufferSpiller;
-
-	/** The pending blocked buffer/event sequences. Must be consumed before requesting
-	 * further data from the input gate. */
-	private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
-
-	/** The sequence of buffers/events that has been unblocked and must now be consumed
-	 * before requesting further data from the input gate */
-	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
-
-	/** Handler that receives the checkpoint notifications */
-	private EventListener<CheckpointBarrier> checkpointHandler;
-
-	/** The ID of the checkpoint for which we expect barriers */
-	private long currentCheckpointId = -1L;
-
-	/** The number of received barriers (= number of blocked/buffered channels) */
-	private int numBarriersReceived;
-	
-	/** The number of already closed channels */
-	private int numClosedChannels;
-	
-	/** Flag to indicate whether we have drawn all available input */
-	private boolean endOfStream;
-
-	/**
-	 * 
-	 * @param inputGate The input gate to draw the buffers and events from.
-	 * @param ioManager The I/O manager that gives access to the temp directories.
-	 * 
-	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
-	 */
-	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
-		this.inputGate = inputGate;
-		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-		
-		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
-		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Buffer and barrier handling
-	// ------------------------------------------------------------------------
-
-	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
-		while (true) {
-			// process buffered BufferOrEvents before grabbing new ones
-			BufferOrEvent next;
-			if (currentBuffered == null) {
-				next = inputGate.getNextBufferOrEvent();
-			}
-			else {
-				next = currentBuffered.getNext();
-				if (next == null) {
-					completeBufferedSequence();
-					return getNextNonBlocked();
-				}
-			}
-			
-			if (next != null) {
-				if (isBlocked(next.getChannelIndex())) {
-					// if the channel is blocked we, we just store the BufferOrEvent
-					bufferSpiller.add(next);
-				}
-				else if (next.isBuffer()) {
-					return next;
-				}
-				else if (next.getEvent().getClass() == CheckpointBarrier.class) {
-					if (!endOfStream) {
-						// process barriers only if there is a chance of the checkpoint completing
-						processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
-					}
-				}
-				else {
-					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
-						numClosedChannels++;
-						// no chance to complete this checkpoint
-						releaseBlocks();
-					}
-					return next;
-				}
-			}
-			else if (!endOfStream) {
-				// end of stream. we feed the data that is still buffered
-				endOfStream = true;
-				releaseBlocks();
-				return getNextNonBlocked();
-			}
-			else {
-				return null;
-			}
-		}
-	}
-	
-	private void completeBufferedSequence() throws IOException {
-		currentBuffered.cleanup();
-		currentBuffered = queuedBuffered.pollFirst();
-		if (currentBuffered != null) {
-			currentBuffered.open();
-		}
-	}
-	
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
-		final long barrierId = receivedBarrier.getId();
-
-		if (numBarriersReceived > 0) {
-			// subsequent barrier of a checkpoint.
-			if (barrierId == currentCheckpointId) {
-				// regular case
-				onBarrier(channelIndex);
-			}
-			else if (barrierId > currentCheckpointId) {
-				// we did not complete the current checkpoint
-				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
-
-				releaseBlocks();
-				currentCheckpointId = barrierId;
-				onBarrier(channelIndex);
-			}
-			else {
-				// ignore trailing barrier from aborted checkpoint
-				return;
-			}
-			
-		}
-		else if (barrierId > currentCheckpointId) {
-			// first barrier of a new checkpoint
-			currentCheckpointId = barrierId;
-			onBarrier(channelIndex);
-		}
-		else {
-			// trailing barrier from previous (skipped) checkpoint
-			return;
-		}
-
-		// check if we have all barriers
-		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
-						receivedBarrier.getId(), receivedBarrier.getTimestamp());
-			}
-
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
-			
-			releaseBlocks();
-		}
-	}
-	
-	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
-		}
-		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
-		}
-	}
-	
-	@Override
-	public boolean isEmpty() {
-		return currentBuffered == null;
-	}
-
-	@Override
-	public void cleanup() throws IOException {
-		bufferSpiller.close();
-		if (currentBuffered != null) {
-			currentBuffered.cleanup();
-		}
-		for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
-			seq.cleanup();
-		}
-	}
-	
-	/**
-	 * Checks whether the channel with the given index is blocked.
-	 * 
-	 * @param channelIndex The channel index to check.
-	 * @return True if the channel is blocked, false if not.
-	 */
-	private boolean isBlocked(int channelIndex) {
-		return blockedChannels[channelIndex];
-	}
-	
-	/**
-	 * Blocks the given channel index, from which a barrier has been received.
-	 * 
-	 * @param channelIndex The channel index to block.
-	 */
-	private void onBarrier(int channelIndex) throws IOException {
-		if (!blockedChannels[channelIndex]) {
-			blockedChannels[channelIndex] = true;
-			numBarriersReceived++;
-			
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received barrier from channel " + channelIndex);
-			}
-		}
-		else {
-			throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
-		}
-	}
-
-	/**
-	 * Releases the blocks on all channels. Makes sure the just written data
-	 * is the next to be consumed.
-	 */
-	private void releaseBlocks() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Releasing blocks");
-		}
-
-		for (int i = 0; i < blockedChannels.length; i++) {
-			blockedChannels[i] = false;
-		}
-		numBarriersReceived = 0;
-
-		if (currentBuffered == null) {
-			// common case: no more buffered data
-			currentBuffered = bufferSpiller.rollOver();
-			if (currentBuffered != null) {
-				currentBuffered.open();
-			}
-		}
-		else {
-			// uncommon case: buffered data pending
-			// push back the pending data, if we have any
-			
-			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
-			BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();
-			if (bufferedNow != null) {
-				bufferedNow.open();
-				queuedBuffered.addFirst(currentBuffered);
-				currentBuffered = bufferedNow;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// For Testing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the ID defining the current pending, or just completed, checkpoint.
-	 * 
-	 * @return The ID of the pending of completed checkpoint. 
-	 */
-	public long getCurrentCheckpointId() {
-		return this.currentCheckpointId;
-	}
-	
-	// ------------------------------------------------------------------------
-	// Utilities 
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
-				currentCheckpointId, numBarriersReceived, numClosedChannels);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
deleted file mode 100644
index 119fb23..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-/**
- * The BarrierTracker keeps track of what checkpoint barriers have been received from
- * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
- * it notifies its listener of a completed checkpoint.
- * 
- * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
- * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
- * 
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
- */
-public class BarrierTracker implements CheckpointBarrierHandler {
-
-	/** The tracker tracks a maximum number of checkpoints, for which some, but not all
-	 * barriers have yet arrived. */
-	private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
-	
-	/** The input gate, to draw the buffers and events from */
-	private final InputGate inputGate;
-	
-	/** The number of channels. Once that many barriers have been received for a checkpoint,
-	 * the checkpoint is considered complete. */
-	private final int totalNumberOfInputChannels;
-
-	/** All checkpoints for which some (but not all) barriers have been received,
-	 * and that are not yet known to be subsumed by newer checkpoints */
-	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
-	
-	/** The listener to be notified on complete checkpoints */
-	private EventListener<CheckpointBarrier> checkpointHandler;
-	
-	/** The highest checkpoint ID encountered so far */
-	private long latestPendingCheckpointID = -1;
-	
-	
-	public BarrierTracker(InputGate inputGate) {
-		this.inputGate = inputGate;
-		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
-		this.pendingCheckpoints = new ArrayDeque<CheckpointBarrierCount>();
-	}
-
-	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
-		while (true) {
-			BufferOrEvent next = inputGate.getNextBufferOrEvent();
-			if (next == null) {
-				return null;
-			}
-			else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
-				return next;
-			}
-			else {
-				processBarrier((CheckpointBarrier) next.getEvent());
-			}
-		}
-	}
-
-	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
-		}
-		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		pendingCheckpoints.clear();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return pendingCheckpoints.isEmpty();
-	}
-
-	private void processBarrier(CheckpointBarrier receivedBarrier) {
-		// fast path for single channel trackers
-		if (totalNumberOfInputChannels == 1) {
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
-			return;
-		}
-		
-		// general path for multiple input channels
-		final long barrierId = receivedBarrier.getId();
-
-		// find the checkpoint barrier in the queue of bending barriers
-		CheckpointBarrierCount cbc = null;
-		int pos = 0;
-		
-		for (CheckpointBarrierCount next : pendingCheckpoints) {
-			if (next.checkpointId == barrierId) {
-				cbc = next;
-				break;
-			}
-			pos++;
-		}
-		
-		if (cbc != null) {
-			// add one to the count to that barrier and check for completion
-			int numBarriersNew = cbc.incrementBarrierCount();
-			if (numBarriersNew == totalNumberOfInputChannels) {
-				// checkpoint can be triggered
-				// first, remove this checkpoint and all all prior pending
-				// checkpoints (which are now subsumed)
-				for (int i = 0; i <= pos; i++) {
-					pendingCheckpoints.pollFirst();
-				}
-				
-				// notify the listener
-				if (checkpointHandler != null) {
-					checkpointHandler.onEvent(receivedBarrier);
-				}
-			}
-		}
-		else {
-			// first barrier for that checkpoint ID
-			// add it only if it is newer than the latest checkpoint.
-			// if it is not newer than the latest checkpoint ID, then there cannot be a
-			// successful checkpoint for that ID anyways
-			if (barrierId > latestPendingCheckpointID) {
-				latestPendingCheckpointID = barrierId;
-				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
-				
-				// make sure we do not track too many checkpoints
-				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
-					pendingCheckpoints.pollFirst();
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple class for a checkpoint ID with a barrier counter.
-	 */
-	private static final class CheckpointBarrierCount {
-		
-		private final long checkpointId;
-		
-		private int barrierCount;
-		
-		private CheckpointBarrierCount(long checkpointId) {
-			this.checkpointId = checkpointId;
-			this.barrierCount = 1;
-		}
-
-		public int incrementBarrierCount() {
-			return ++barrierCount;
-		}
-		
-		@Override
-		public int hashCode() {
-			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof  CheckpointBarrierCount) {
-				CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
-				return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
-			}
-			else {
-				return false;
-			}
-		}
-
-		@Override
-		public String toString() {
-			return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
deleted file mode 100644
index be3c9af..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-
-public class BlockingQueueBroker extends Broker<BlockingQueue<?>> {
-	
-	/** Singleton instance */
-	public static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
-	/** Cannot instantiate */
-	private BlockingQueueBroker() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
deleted file mode 100644
index cabed14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.util.StringUtils;
-
-/**
- * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
- * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
- * elements as a readable sequence, and opens a new spill file.
- * 
- * <p>This implementation buffers data effectively in the OS cache, which gracefully extends to the
- * disk. Most data is written and re-read milliseconds later. The file is deleted after the read.
- * Consequently, in most cases, the data will never actually hit the physical disks.</p>
- * 
- * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all reuse the same
- * reading memory (to reduce overhead) and can consequently not be read concurrently.</p>
- */
-public class BufferSpiller {
-
-	/** The counter that selects the next directory to spill into */
-	private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
-	
-	/** The size of the buffer with which data is read back in */
-	private static final int READ_BUFFER_SIZE = 1024 * 1024;
-	
-	/** The directories to spill to */
-	private final File tempDir;
-	
-	/** The name prefix for spill files */
-	private final String spillFilePrefix;
-	
-	/** The buffer used for bulk reading data (used in the SpilledBufferOrEventSequence) */
-	private final ByteBuffer readBuffer;
-	
-	/** The buffer that encodes the spilled header */
-	private final ByteBuffer headBuffer;
-	
-	/** The reusable array that holds header and contents buffers */
-	private final ByteBuffer[] sources;
-	
-	/** The file that we currently spill to */
-	private File currentSpillFile;
-	
-	/** The channel of the file we currently spill to */
-	private FileChannel currentChannel;
-
-	/** The page size, to let this reader instantiate properly sized memory segments */
-	private final int pageSize;
-	
-	/** A counter, to created numbered spill files */
-	private int fileCounter;
-	
-	/** A flag to check whether the spiller has written since the last roll over */
-	private boolean hasWritten;
-	
-	/**
-	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
-	 * 
-	 * @param ioManager The I/O manager for access to teh temp directories.
-	 * @param pageSize The page size used to re-create spilled buffers.
-	 * @throws IOException Thrown if the temp files for spilling cannot be initialized.
-	 */
-	public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
-		this.pageSize = pageSize;
-		
-		this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
-		this.readBuffer.order(ByteOrder.LITTLE_ENDIAN);
-		
-		this.headBuffer = ByteBuffer.allocateDirect(16);
-		this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
-		
-		this.sources = new ByteBuffer[] { this.headBuffer, null };
-		
-		File[] tempDirs = ioManager.getSpillingDirectories();
-		this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
-		
-		byte[] rndBytes = new byte[32];
-		new Random().nextBytes(rndBytes);
-		this.spillFilePrefix = StringUtils.byteToHexString(rndBytes) + '.';
-		
-		// prepare for first contents
-		createSpillingChannel();
-	}
-
-	/**
-	 * Adds a buffer or event to the sequence of spilled buffers and events.
-	 * 
-	 * @param boe The buffer or event to add and spill.
-	 * @throws IOException Thrown, if the buffer of event could not be spilled.
-	 */
-	public void add(BufferOrEvent boe) throws IOException {
-		hasWritten = true;
-		try {
-			ByteBuffer contents;
-			if (boe.isBuffer()) {
-				Buffer buf = boe.getBuffer();
-				contents = buf.getMemorySegment().wrap(0, buf.getSize());
-			}
-			else {
-				contents = EventSerializer.toSerializedEvent(boe.getEvent());
-			}
-			
-			headBuffer.clear();
-			headBuffer.putInt(boe.getChannelIndex());
-			headBuffer.putInt(contents.remaining());
-			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
-			headBuffer.flip();
-			
-			sources[1] = contents;
-			currentChannel.write(sources);
-		}
-		finally {
-			if (boe.isBuffer()) {
-				boe.getBuffer().recycle();
-			}
-		}
-	}
-
-	/**
-	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
-	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
-	 * last call to this method.
-	 * 
-	 * <p>NOTE: The SpilledBufferOrEventSequences created by this method all reuse the same
-	 * reading memory (to reduce overhead) and can consequently not be read concurrently with each other.
-	 * To create a sequence that can be read concurrently with the previous SpilledBufferOrEventSequence, use the
-	 * {@link #rollOverWithNewBuffer()} method.</p>
-	 * 
-	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
-	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
-	 *                     file could be created.
-	 */
-	public SpilledBufferOrEventSequence rollOver() throws IOException {
-		return rollOverInternal(false);
-	}
-
-	/**
-	 * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
-	 * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
-	 * last call to this method.
-	 * 
-	 * <p>The SpilledBufferOrEventSequence returned by this method is safe for concurrent consumption with
-	 * any previously returned sequence.</p>
-	 *
-	 * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
-	 * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
-	 *                     file could be created.
-	 */
-	public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException {
-		return rollOverInternal(true);
-	}
-	
-	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
-		if (!hasWritten) {
-			return null;
-		}
-		
-		ByteBuffer buf;
-		if (newBuffer) {
-			buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
-			buf.order(ByteOrder.LITTLE_ENDIAN);
-		} else {
-			buf = readBuffer;
-		}
-		
-		// create a reader for the spilled data
-		currentChannel.position(0L);
-		SpilledBufferOrEventSequence seq = 
-				new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
-		
-		// create ourselves a new spill file
-		createSpillingChannel();
-		
-		hasWritten = false;
-		return seq;
-	}
-
-	/**
-	 * Cleans up the current spilling channel and file.
-	 * 
-	 * Does not clean up the SpilledBufferOrEventSequences generated by calls to 
-	 * {@link #rollOver()}.
-	 * 
-	 * @throws IOException Thrown if channel closing or file deletion fail.
-	 */
-	public void close() throws IOException {
-		currentChannel.close();
-		if (!currentSpillFile.delete()) {
-			throw new IOException("Cannot delete spill file");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  For testing
-	// ------------------------------------------------------------------------
-
-	File getCurrentSpillFile() {
-		return currentSpillFile;
-	}
-	
-	FileChannel getCurrentChannel() {
-		return currentChannel;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("resource")
-	private void createSpillingChannel() throws IOException {
-		currentSpillFile = new File(tempDir, spillFilePrefix + (fileCounter++) +".buffer");
-		currentChannel = new RandomAccessFile(currentSpillFile, "rw").getChannel();
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This class represents a sequence of spilled buffers and events, created by the
-	 * {@link BufferSpiller}. The sequence of buffers and events can be read back using the
-	 * method {@link #getNext()}.
-	 */
-	public static class SpilledBufferOrEventSequence {
-		
-		/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
-		private static final int HEADER_LENGTH = 9;
-
-		/** The file containing the data */
-		private final File file;
-		
-		/** The file channel to draw the data from */
-		private final FileChannel fileChannel;
-		
-		/** The byte buffer for bulk reading */
-		private final ByteBuffer buffer;
-
-		/** The page size to instantiate properly sized memory segments */
-		private final int pageSize;
-
-		/** Flag to track whether the sequence has been opened already */
-		private boolean opened = false;
-
-		/**
-		 * Create a reader that reads a sequence of spilled buffers and events.
-		 * 
-		 * @param file The file with the data.
-		 * @param fileChannel The file channel to read the data from.
-		 * @param buffer The buffer used for bulk reading.
-		 * @param pageSize The page size to use for the created memory segments.
-		 */
-		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
-			this.file = file;
-			this.fileChannel = fileChannel;
-			this.buffer = buffer;
-			this.pageSize = pageSize;
-		}
-
-		/**
-		 * Initializes the sequence for reading.
-		 * This method needs to be called before the first call to {@link #getNext()}. Otherwise
-		 * the results of {@link #getNext()} are not predictable.
-		 */
-		public void open() {
-			if (!opened) {
-				opened = true;
-				buffer.position(0);
-				buffer.limit(0);
-			}
-		}
-
-		/**
-		 * Gets the next BufferOrEvent from the spilled sequence, or {@code null}, if the
-		 * sequence is exhausted.
-		 *         
-		 * @return The next BufferOrEvent from the spilled sequence, or {@code null} (end of sequence).
-		 * @throws IOException Thrown, if the reads failed, of if the byte stream is corrupt.
-		 */
-		public BufferOrEvent getNext() throws IOException {
-			if (buffer.remaining() < HEADER_LENGTH) {
-				buffer.compact();
-				
-				while (buffer.position() < HEADER_LENGTH) {
-					if (fileChannel.read(buffer) == -1) {
-						if (buffer.position() == 0) {
-							// no trailing data
-							return null;
-						} else {
-							throw new IOException("Found trailing incomplete buffer or event");
-						}
-					}
-				}
-				
-				buffer.flip();
-			}
-			
-			final int channel = buffer.getInt();
-			final int length = buffer.getInt();
-			final boolean isBuffer = buffer.get() == 0;
-			
-			
-			if (isBuffer) {
-				// deserialize buffer
-				if (length > pageSize) {
-					throw new IOException(String.format(
-							"Spilled buffer (%d bytes) is larger than page size of (%d bytes)", length, pageSize));
-				}
-
-				MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(pageSize);
-				
-				int segPos = 0;
-				int bytesRemaining = length;
-				
-				while (true) {
-					int toCopy = Math.min(buffer.remaining(), bytesRemaining);
-					if (toCopy > 0) {
-						seg.put(segPos, buffer, toCopy);
-						segPos += toCopy;
-						bytesRemaining -= toCopy;
-					}
-					
-					if (bytesRemaining == 0) {
-						break;
-					}
-					else {
-						buffer.clear();
-						if (fileChannel.read(buffer) == -1) {
-							throw new IOException("Found trailing incomplete buffer");
-						}
-						buffer.flip();
-					}
-				}
-				
-				
-				Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
-				buf.setSize(length);
-				
-				return new BufferOrEvent(buf, channel);
-			}
-			else {
-				// deserialize event
-				if (length > buffer.capacity() - HEADER_LENGTH) {
-					throw new IOException("Event is too large");
-				}
-
-				if (buffer.remaining() < length) {
-					buffer.compact();
-
-					while (buffer.position() < length) {
-						if (fileChannel.read(buffer) == -1) {
-							throw new IOException("Found trailing incomplete event");
-						}
-					}
-
-					buffer.flip();
-				}
-
-				int oldLimit = buffer.limit();
-				buffer.limit(buffer.position() + length);
-				AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
-				buffer.limit(oldLimit);
-				
-				return new BufferOrEvent(evt, channel);
-			}
-		}
-
-		/**
-		 * Cleans up all file resources held by this spilled sequence.
-		 * 
-		 * @throws IOException Thrown, if file channel closing or file deletion fail. 
-		 */
-		public void cleanup() throws IOException {
-			fileChannel.close();
-			if (!file.delete()) {
-				throw new IOException("Cannot remove temp file for stream alignment writer");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
deleted file mode 100644
index 791fd40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-
-/**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
- * Different implementations may either simply track barriers, or block certain inputs on
- * barriers.
- */
-public interface CheckpointBarrierHandler {
-
-	/**
-	 * Returns the next {@link BufferOrEvent} that the operator may consume.
-	 * This call blocks until the next BufferOrEvent is available, ir until the stream
-	 * has been determined to be finished.
-	 * 
-	 * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
-	 * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
-	 * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
-	 *                                        waiting for the next BufferOrEvent to become available.
-	 */
-	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
-
-	/**
-	 * Registers the given event handler to be notified on successful checkpoints.
-	 * 
-	 * @param checkpointHandler The handler to register.
-	 */
-	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
-
-	/**
-	 * Cleans up all internally held resources.
-	 * 
-	 * @throws IOException Thrown, if the cleanup of I/O resources failed.
-	 */
-	void cleanup() throws IOException;
-
-	/**
-	 * Checks if the barrier handler has buffered any data internally.
-	 * @return True, if no data is buffered internally, false otherwise.
-	 */
-	boolean isEmpty();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
deleted file mode 100644
index 01e997d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
-
-	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
-
-	private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
-
-	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
-		this.outputSelectorWrapper = outputSelectorWrapper;
-		allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
-	}
-	
-	public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge) {
-		outputSelectorWrapper.addCollector(output, edge);
-		allOutputs.add(output);
-	}
-
-	@Override
-	public void collect(StreamRecord<OUT> record) {
-		for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
-			output.collect(record);
-		}
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		for (Output<?> output : allOutputs) {
-			output.emitWatermark(mark);
-		}
-	}
-
-	@Override
-	public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
deleted file mode 100644
index f11e9a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-/**
- * Utility for dealing with input gates. This will either just return
- * the single {@link InputGate} that was passed in or create a {@link UnionInputGate} if several
- * {@link InputGate input gates} are given.
- */
-public class InputGateUtil {
-
-	public static InputGate createInputGate(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2) {
-		List<InputGate> gates = new ArrayList<InputGate>(inputGates1.size() + inputGates2.size());
-		gates.addAll(inputGates1);
-		gates.addAll(inputGates2);
-		return createInputGate(gates.toArray(new InputGate[gates.size()]));
-	}
-
-	public static InputGate createInputGate(InputGate[] inputGates) {
-		if (inputGates.length <= 0) {
-			throw new RuntimeException("No such input gate.");
-		}
-
-		if (inputGates.length < 2) {
-			return inputGates[0];
-		} else {
-			return new UnionInputGate(inputGates);
-		}
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private InputGateUtil() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
deleted file mode 100644
index 34e5800..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
- */
-public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
-
-	private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
-	
-	private SerializationDelegate<StreamElement> serializationDelegate;
-
-	
-	@SuppressWarnings("unchecked")
-	public RecordWriterOutput(
-			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
-			TypeSerializer<OUT> outSerializer,
-			boolean enableWatermarkMultiplexing) {
-
-		checkNotNull(recordWriter);
-		
-		// generic hack: cast the writer to generic Object type so we can use it 
-		// with multiplexed records and watermarks
-		this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>) 
-				(StreamRecordWriter<?>) recordWriter;
-
-		TypeSerializer<StreamElement> outRecordSerializer;
-		if (enableWatermarkMultiplexing) {
-			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
-		} else {
-			outRecordSerializer = (TypeSerializer<StreamElement>)
-					(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
-		}
-
-		if (outSerializer != null) {
-			serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
-		}
-	}
-
-	@Override
-	public void collect(StreamRecord<OUT> record) {
-		serializationDelegate.setInstance(record);
-
-		try {
-			recordWriter.emit(serializationDelegate);
-		}
-		catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		serializationDelegate.setInstance(mark);
-		
-		try {
-			recordWriter.broadcastEmit(serializationDelegate);
-		}
-		catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
-	}
-
-	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
-		recordWriter.broadcastEvent(barrier);
-	}
-	
-	
-	public void flush() throws IOException {
-		recordWriter.flush();
-	}
-	
-	@Override
-	public void close() {
-		recordWriter.close();
-	}
-
-	public void clearBuffers() {
-		recordWriter.clearBuffers();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
deleted file mode 100644
index e131cda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-/**
- * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
- *
- * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
- * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently
- * with the timer callback or other things.
- * 
- * @param <IN> The type of the record that can be read with this record reader.
- */
-public class StreamInputProcessor<IN> {
-	
-	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
-
-	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
-
-	private final CheckpointBarrierHandler barrierHandler;
-
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
-
-	private boolean isFinished;
-
-	
-
-	private final long[] watermarks;
-	private long lastEmittedWatermark;
-
-	private final DeserializationDelegate<StreamElement> deserializationDelegate;
-
-	@SuppressWarnings("unchecked")
-	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
-								EventListener<CheckpointBarrier> checkpointListener,
-								CheckpointingMode checkpointMode,
-								IOManager ioManager,
-								boolean enableWatermarkMultiplexing) throws IOException {
-
-		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-
-		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
-		}
-		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-			this.barrierHandler = new BarrierTracker(inputGate);
-		}
-		else {
-			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
-		}
-		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
-		}
-		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
-			this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
-		} else {
-			StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
-			this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
-					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
-		}
-		
-		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
-		
-		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
-		}
-
-		watermarks = new long[inputGate.getNumberOfInputChannels()];
-		for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
-			watermarks[i] = Long.MIN_VALUE;
-		}
-		lastEmittedWatermark = Long.MIN_VALUE;
-	}
-
-	@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
-		if (isFinished) {
-			return false;
-		}
-
-		while (true) {
-			if (currentRecordDeserializer != null) {
-				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
-
-				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
-					currentRecordDeserializer = null;
-				}
-
-				if (result.isFullRecord()) {
-					StreamElement recordOrWatermark = deserializationDelegate.getInstance();
-
-					if (recordOrWatermark.isWatermark()) {
-						long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
-						if (watermarkMillis > watermarks[currentChannel]) {
-							watermarks[currentChannel] = watermarkMillis;
-							long newMinWatermark = Long.MAX_VALUE;
-							for (long watermark : watermarks) {
-								newMinWatermark = Math.min(watermark, newMinWatermark);
-							}
-							if (newMinWatermark > lastEmittedWatermark) {
-								lastEmittedWatermark = newMinWatermark;
-								synchronized (lock) {
-									streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
-								}
-							}
-						}
-						continue;
-					} else {
-						// now we can do the actual processing
-						StreamRecord<IN> record = recordOrWatermark.asRecord();
-						synchronized (lock) {
-							streamOperator.setKeyContextElement(record);
-							streamOperator.processElement(record);
-						}
-						return true;
-					}
-				}
-			}
-
-			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
-			if (bufferOrEvent != null) {
-				if (bufferOrEvent.isBuffer()) {
-					currentChannel = bufferOrEvent.getChannelIndex();
-					currentRecordDeserializer = recordDeserializers[currentChannel];
-					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-				}
-				else {
-					// Event received
-					final AbstractEvent event = bufferOrEvent.getEvent();
-					if (event.getClass() != EndOfPartitionEvent.class) {
-						throw new IOException("Unexpected event: " + event);
-					}
-				}
-			}
-			else {
-				isFinished = true;
-				if (!barrierHandler.isEmpty()) {
-					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
-				}
-				return false;
-			}
-		}
-	}
-	
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.setReporter(reporter);
-		}
-	}
-	
-	public void cleanup() throws IOException {
-		// clear the buffers first. this part should not ever fail
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-		
-		// cleanup the barrier handler resources
-		barrierHandler.cleanup();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
deleted file mode 100644
index 8dcaad8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
- * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
- * 
- * @param <T> The type of elements written.
- */
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
-	/** Default name for teh output flush thread, if no name with a task reference is given */
-	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
-	
-	
-	/** The thread that periodically flushes the output, to give an upper latency bound */
-	private final OutputFlusher outputFlusher;
-	
-	/** Flag indicating whether the output should be flushed after every element */
-	private final boolean flushAlways;
-
-	/** The exception encountered in the flushing thread */
-	private Throwable flusherException;
-	
-	
-	
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
-		this(writer, channelSelector, timeout, null);
-	}
-	
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
-								long timeout, String taskName) {
-		
-		super(writer, channelSelector);
-		
-		checkArgument(timeout >= -1);
-		
-		if (timeout == -1) {
-			flushAlways = false;
-			outputFlusher = null;
-		}
-		else if (timeout == 0) {
-			flushAlways = true;
-			outputFlusher = null;
-		}
-		else {
-			flushAlways = false;
-			String threadName = taskName == null ?
-								DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
-			
-			outputFlusher = new OutputFlusher(threadName, timeout);
-			outputFlusher.start();
-		}
-	}
-	
-	@Override
-	public void emit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		super.emit(record);
-		if (flushAlways) {
-			flush();
-		}
-	}
-
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		super.broadcastEmit(record);
-		if (flushAlways) {
-			flush();
-		}
-	}
-
-	/**
-	 * Closes the writer. This stops the flushing thread (if there is one).
-	 */
-	public void close() {
-		// make sure we terminate the thread in any case
-		if (outputFlusher != null) {
-			outputFlusher.terminate();
-			try {
-				outputFlusher.join();
-			}
-			catch (InterruptedException e) {
-				// ignore on close
-			}
-		}
-	}
-
-	/**
-	 * Notifies the writer that the output flusher thread encountered an exception.
-	 * 
-	 * @param t The exception to report.
-	 */
-	private void notifyFlusherException(Throwable t) {
-		if (this.flusherException == null) {
-			this.flusherException = t;
-		}
-	}
-	
-	private void checkErroneous() throws IOException {
-		if (flusherException != null) {
-			throw new IOException("An exception happened while flushing the outputs", flusherException);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
-	 * 
-	 * The thread is daemonic, because it is only a utility thread.
-	 */
-	private class OutputFlusher extends Thread {
-		
-		private final long timeout;
-		
-		private volatile boolean running = true;
-
-		
-		OutputFlusher(String name, long timeout) {
-			super(name);
-			setDaemon(true);
-			this.timeout = timeout;
-		}
-		
-		public void terminate() {
-			running = false;
-			interrupt();
-		}
-
-		@Override
-		public void run() {
-			try {
-				while (running) {
-					try {
-						Thread.sleep(timeout);
-					}
-					catch (InterruptedException e) {
-						// propagate this if we are still running, because it should not happen
-						// in that case
-						if (running) {
-							throw new Exception(e);
-						}
-					}
-					
-					// any errors here should let the thread come to a halt and be
-					// recognized by the writer 
-					flush();
-				}
-			}
-			catch (Throwable t) {
-				notifyFlusherException(t);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
deleted file mode 100644
index 882037e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
- *
- * <p>
- * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
- * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
- *
- * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
- * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently
- * with the timer callback or other things.
- *
- * @param <IN1> The type of the records that arrive on the first input
- * @param <IN2> The type of the records that arrive on the second input
- */
-public class StreamTwoInputProcessor<IN1, IN2> {
-
-	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
-
-	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
-
-	// We need to keep track of the channel from which a buffer came, so that we can
-	// appropriately map the watermarks to input channels
-	private int currentChannel = -1;
-
-	private boolean isFinished;
-
-	private final CheckpointBarrierHandler barrierHandler;
-
-	private final long[] watermarks1;
-	private long lastEmittedWatermark1;
-
-	private final long[] watermarks2;
-	private long lastEmittedWatermark2;
-
-	private final int numInputChannels1;
-
-	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
-	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
-
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public StreamTwoInputProcessor(
-			Collection<InputGate> inputGates1,
-			Collection<InputGate> inputGates2,
-			TypeSerializer<IN1> inputSerializer1,
-			TypeSerializer<IN2> inputSerializer2,
-			EventListener<CheckpointBarrier> checkpointListener,
-			CheckpointingMode checkpointMode,
-			IOManager ioManager,
-			boolean enableWatermarkMultiplexing) throws IOException {
-		
-		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
-
-		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
-		}
-		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-			this.barrierHandler = new BarrierTracker(inputGate);
-		}
-		else {
-			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
-		}
-		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
-		}
-		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
-		}
-		else {
-			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
-			this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
-					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
-		}
-		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
-		}
-		else {
-			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
-			this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
-					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
-		}
-
-		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
-		
-		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
-		}
-
-		// determine which unioned channels belong to input 1 and which belong to input 2
-		int numInputChannels1 = 0;
-		for (InputGate gate: inputGates1) {
-			numInputChannels1 += gate.getNumberOfInputChannels();
-		}
-		
-		this.numInputChannels1 = numInputChannels1;
-		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
-
-		watermarks1 = new long[numInputChannels1];
-		Arrays.fill(watermarks1, Long.MIN_VALUE);
-		lastEmittedWatermark1 = Long.MIN_VALUE;
-
-		watermarks2 = new long[numInputChannels2];
-		Arrays.fill(watermarks2, Long.MIN_VALUE);
-		lastEmittedWatermark2 = Long.MIN_VALUE;
-	}
-
-	@SuppressWarnings("unchecked")
-	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Object lock) throws Exception {
-		if (isFinished) {
-			return false;
-		}
-
-		while (true) {
-			if (currentRecordDeserializer != null) {
-				DeserializationResult result;
-				if (currentChannel < numInputChannels1) {
-					result = currentRecordDeserializer.getNextRecord(deserializationDelegate1);
-				} else {
-					result = currentRecordDeserializer.getNextRecord(deserializationDelegate2);
-				}
-
-				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
-					currentRecordDeserializer = null;
-				}
-
-				if (result.isFullRecord()) {
-					if (currentChannel < numInputChannels1) {
-						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
-						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock);
-							continue;
-						}
-						else {
-							synchronized (lock) {
-								streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
-							}
-							return true;
-
-						}
-					}
-					else {
-						StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
-						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
-							continue;
-						}
-						else {
-							synchronized (lock) {
-								streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
-							}
-							return true;
-						}
-					}
-				}
-			}
-
-			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
-			if (bufferOrEvent != null) {
-
-				if (bufferOrEvent.isBuffer()) {
-					currentChannel = bufferOrEvent.getChannelIndex();
-					currentRecordDeserializer = recordDeserializers[currentChannel];
-					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-	
-				} else {
-					// Event received
-					final AbstractEvent event = bufferOrEvent.getEvent();
-					if (event.getClass() != EndOfPartitionEvent.class) {
-						throw new IOException("Unexpected event: " + event);
-					}
-				}
-			}
-			else {
-				isFinished = true;
-				if (!barrierHandler.isEmpty()) {
-					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
-				}
-				return false;
-			}
-		}
-	}
-
-	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex, Object lock) throws Exception {
-		if (channelIndex < numInputChannels1) {
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks1[channelIndex]) {
-				watermarks1[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks1) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark1) {
-					lastEmittedWatermark1 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark1(new Watermark(lastEmittedWatermark1));
-					}
-				}
-			}
-		} else {
-			channelIndex = channelIndex - numInputChannels1;
-			long watermarkMillis = mark.getTimestamp();
-			if (watermarkMillis > watermarks2[channelIndex]) {
-				watermarks2[channelIndex] = watermarkMillis;
-				long newMinWatermark = Long.MAX_VALUE;
-				for (long wm : watermarks2) {
-					newMinWatermark = Math.min(wm, newMinWatermark);
-				}
-				if (newMinWatermark > lastEmittedWatermark2) {
-					lastEmittedWatermark2 = newMinWatermark;
-					synchronized (lock) {
-						operator.processWatermark2(new Watermark(lastEmittedWatermark2));
-					}
-				}
-			}
-		}
-	}
-	
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.setReporter(reporter);
-		}
-	}
-	
-	public void cleanup() throws IOException {
-		// clear the buffers first. this part should not ever fail
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-
-		// cleanup the barrier handler resources
-		barrierHandler.cleanup();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
deleted file mode 100644
index 793e87e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-
-import java.io.IOException;
-
-public interface StreamingReader extends ReaderBase {
-
-	void cleanup() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
deleted file mode 100644
index 7020758..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * An operator that can sort a stream based on timestamps. Arriving elements will be put into
- * buckets based on their timestamp. Sorting and emission of sorted elements happens once
- * the watermark passes the end of a bucket.
- *
- * @param <T> The type of the elements on which this operator works.
- */
-public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
-	private static final long serialVersionUID = 1L;
-
-	private long granularity;
-
-	private transient Map<Long, List<StreamRecord<T>>> buckets;
-
-	/**
-	 * Creates a new sorting operator that creates buckets with the given interval.
-	 *
-	 * @param interval The size (in time) of one bucket.
-	 */
-	public BucketStreamSortOperator(long interval) {
-		this.granularity = interval;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		buckets = new HashMap<>();
-
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<T> record) throws Exception {
-		long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
-		List<StreamRecord<T>> bucket = buckets.get(bucketId);
-		if (bucket == null) {
-			bucket = new ArrayList<>();
-			buckets.put(bucketId, bucket);
-		}
-		bucket.add(record);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
-		Set<Long> toRemove = new HashSet<>();
-		for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
-			if (bucket.getKey() < maxBucketId) {
-				Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {
-					@Override
-					public int compare(StreamRecord<T> o1, StreamRecord<T> o2) {
-						return (int) (o1.getTimestamp() - o2.getTimestamp());
-					}
-				});
-				for (StreamRecord<T> r: bucket.getValue()) {
-					output.collect(r);
-				}
-				toRemove.add(bucket.getKey());
-			}
-		}
-
-		for (Long l: toRemove) {
-			buckets.remove(l);
-		}
-
-		output.emitWatermark(mark);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
deleted file mode 100644
index 6e51a49..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
- * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}.
- *
- * @param <T> The type of the input elements
- */
-public class ExtractTimestampsOperator<T>
-		extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
-		implements OneInputStreamOperator<T, T>, Triggerable {
-
-	private static final long serialVersionUID = 1L;
-
-	transient long watermarkInterval;
-
-	transient long currentWatermark;
-
-	public ExtractTimestampsOperator(TimestampExtractor<T> extractor) {
-		super(extractor);
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
-		if (watermarkInterval > 0) {
-			registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-		}
-
-		currentWatermark = Long.MIN_VALUE;
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-
-		// emit a final +Inf watermark, just like the sources
-		output.emitWatermark(new Watermark(Long.MAX_VALUE));
-	}
-
-	@Override
-	public void processElement(StreamRecord<T> element) throws Exception {
-		long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
-		output.collect(element.replace(element.getValue(), newTimestamp));
-		long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
-		if (watermark > currentWatermark) {
-			currentWatermark = watermark;
-			output.emitWatermark(new Watermark(currentWatermark));
-		}
-	}
-
-	@Override
-	public void trigger(long timestamp) throws Exception {
-		// register next timer
-		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-		long lastWatermark = currentWatermark;
-		currentWatermark = userFunction.getCurrentWatermark();
-
-		if (currentWatermark > lastWatermark) {
-			// emit watermark
-			output.emitWatermark(new Watermark(currentWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// ignore them, since we are basically a watermark source
-	}
-}