You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:47 UTC

[31/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
deleted file mode 100644
index 5de6cd1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
-	// ------------------------------------------------------------------------
-	// Configuration values and stuff from the user
-	// ------------------------------------------------------------------------
-
-	private final WindowAssigner<? super IN, W> windowAssigner;
-
-	private final Trigger<? super IN, ? super W> trigger;
-
-	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
-
-	/**
-	 * If this is true. The current processing time is set as the timestamp of incoming elements.
-	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
-	 * if eviction should happen based on processing time.
-	 */
-	private boolean setProcessingTime = false;
-
-	/**
-	 * This is used to copy the incoming element because it can be put into several window
-	 * buffers.
-	 */
-	private TypeSerializer<IN> inputSerializer;
-
-	/**
-	 * For serializing the window in checkpoints.
-	 */
-	private final TypeSerializer<W> windowSerializer;
-
-	// ------------------------------------------------------------------------
-	// State that is not checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Processing time timers that are currently in-flight.
-	 */
-	private transient Map<Long, Set<Context>> processingTimeTimers;
-
-	/**
-	 * Current waiting watermark callbacks.
-	 */
-	private transient Map<Long, Set<Context>> watermarkTimers;
-
-	/**
-	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
-	 */
-	protected transient TimestampedCollector<OUT> timestampedCollector;
-
-	// ------------------------------------------------------------------------
-	// State that needs to be checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
-	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
-	 */
-	protected transient Map<W, Context> windows;
-
-	/**
-	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
-	 */
-	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<IN, OUT, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger) {
-
-		super(windowFunction);
-
-		this.windowAssigner = requireNonNull(windowAssigner);
-		this.windowSerializer = windowSerializer;
-
-		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.trigger = requireNonNull(trigger);
-
-		setChainingStrategy(ChainingStrategy.ALWAYS);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
-	}
-
-	@Override
-	public final void open() throws Exception {
-		super.open();
-		timestampedCollector = new TimestampedCollector<>(output);
-
-		if (inputSerializer == null) {
-			throw new IllegalStateException("Input serializer was not set.");
-		}
-
-		windowBufferFactory.setRuntimeContext(getRuntimeContext());
-		windowBufferFactory.open(getUserFunctionParameters());
-
-		// these could already be initialized from restoreState()
-		if (watermarkTimers == null) {
-			watermarkTimers = new HashMap<>();
-		}
-		if (processingTimeTimers == null) {
-			processingTimeTimers = new HashMap<>();
-		}
-		if (windows == null) {
-			windows = new HashMap<>();
-		}
-
-		// re-register timers that this window context had set
-		for (Context context: windows.values()) {
-			if (context.processingTimeTimer > 0) {
-				Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
-				if (triggers == null) {
-					getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
-					triggers = new HashSet<>();
-					processingTimeTimers.put(context.processingTimeTimer, triggers);
-				}
-				triggers.add(context);
-			}
-			if (context.watermarkTimer > 0) {
-				Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
-				if (triggers == null) {
-					triggers = new HashSet<>();
-					watermarkTimers.put(context.watermarkTimer, triggers);
-				}
-				triggers.add(context);
-			}
-
-		}
-	}
-
-	@Override
-	public final void close() throws Exception {
-		super.close();
-		// emit the elements that we still keep
-		for (Context window: windows.values()) {
-			emitWindow(window);
-		}
-		windows.clear();
-		windowBufferFactory.close();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void processElement(StreamRecord<IN> element) throws Exception {
-		if (setProcessingTime) {
-			element.replace(element.getValue(), System.currentTimeMillis());
-		}
-
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
-		for (W window: elementWindows) {
-			Context context = windows.get(window);
-			if (context == null) {
-				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
-				context = new Context(window, windowBuffer);
-				windows.put(window, context);
-			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
-			processTriggerResult(triggerResult, window);
-		}
-	}
-
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setTimestamp(context.window.maxTimestamp());
-
-		userFunction.apply(
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
-	}
-
-	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Context context = windows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Context context = windows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
-
-				emitWindow(context);
-				break;
-			}
-
-			case CONTINUE:
-				// ingore
-		}
-	}
-
-	@Override
-	public final void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
-			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.window);
-				}
-				toRemove.add(triggers.getKey());
-			}
-		}
-
-		for (Long l: toRemove) {
-			watermarkTimers.remove(l);
-		}
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public final void trigger(long time) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			if (triggers.getKey() < time) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
-					processTriggerResult(triggerResult, context.window);
-				}
-				toRemove.add(triggers.getKey());
-			}
-		}
-
-		for (Long l: toRemove) {
-			processingTimeTimers.remove(l);
-		}
-	}
-
-	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
-	 */
-	protected class Context implements Trigger.TriggerContext {
-		protected W window;
-
-		protected WindowBuffer<IN> windowBuffer;
-
-		protected HashMap<String, Serializable> state;
-
-		// use these to only allow one timer in flight at a time of each type
-		// if the trigger registers another timer this value here will be overwritten,
-		// the timer is not removed from the set of in-flight timers to improve performance.
-		// When a trigger fires it is just checked against the last timer that was set.
-		protected long watermarkTimer;
-		protected long processingTimeTimer;
-
-		public Context(
-				W window,
-				WindowBuffer<IN> windowBuffer) {
-			this.window = window;
-			this.windowBuffer = windowBuffer;
-			state = new HashMap<>();
-
-			this.watermarkTimer = -1;
-			this.processingTimeTimer = -1;
-		}
-
-
-		@SuppressWarnings("unchecked")
-		protected Context(DataInputView in) throws Exception {
-			this.window = windowSerializer.deserialize(in);
-			this.watermarkTimer = in.readLong();
-			this.processingTimeTimer = in.readLong();
-
-			int stateSize = in.readInt();
-			byte[] stateData = new byte[stateSize];
-			in.read(stateData);
-			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
-			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
-
-			this.windowBuffer = windowBufferFactory.create();
-			int numElements = in.readInt();
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			for (int i = 0; i < numElements; i++) {
-				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
-			}
-		}
-
-		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
-			windowSerializer.serialize(window, out);
-			out.writeLong(watermarkTimer);
-			out.writeLong(processingTimeTimer);
-
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			SerializationUtils.serialize(state, baos);
-			out.writeInt(baos.size());
-			out.write(baos.toByteArray(), 0, baos.size());
-
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			out.writeInt(windowBuffer.size());
-			for (StreamRecord<IN> element: windowBuffer.getElements()) {
-				recordSerializer.serialize(element, out);
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
-			return new OperatorState<S>() {
-				@Override
-				public S value() throws IOException {
-					Serializable value = state.get(name);
-					if (value == null) {
-						state.put(name, defaultState);
-						value = defaultState;
-					}
-					return (S) value;
-				}
-
-				@Override
-				public void update(S value) throws IOException {
-					state.put(name, value);
-				}
-			};
-		}
-
-		@Override
-		public void registerProcessingTimeTimer(long time) {
-			if (this.processingTimeTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = processingTimeTimers.get(time);
-			if (triggers == null) {
-				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
-				triggers = new HashSet<>();
-				processingTimeTimers.put(time, triggers);
-			}
-			this.processingTimeTimer = time;
-			triggers.add(this);
-		}
-
-		@Override
-		public void registerEventTimeTimer(long time) {
-			if (watermarkTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = watermarkTimers.get(time);
-			if (triggers == null) {
-				triggers = new HashSet<>();
-				watermarkTimers.put(time, triggers);
-			}
-			this.watermarkTimer = time;
-			triggers.add(this);
-		}
-
-		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
-			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
-			} else {
-				return Trigger.TriggerResult.CONTINUE;
-			}
-		}
-
-		public Trigger.TriggerResult onEventTime(long time) throws Exception {
-			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
-			} else {
-				return Trigger.TriggerResult.CONTINUE;
-			}
-		}
-	}
-
-	/**
-	 * When this flag is enabled the current processing time is set as the timestamp of elements
-	 * upon arrival. This must be used, for example, when using the
-	 * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
-	 * time semantics.
-	 */
-	public NonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
-		this.setProcessingTime = setProcessingTime;
-		return this;
-	}
-
-	@Override
-	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		if (userFunction instanceof OutputTypeConfigurable) {
-			@SuppressWarnings("unchecked")
-			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
-			typeConfigurable.setOutputType(outTypeInfo, executionConfig);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-		// we write the panes with the key/value maps into the stream
-		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-		int numWindows = windows.size();
-		out.writeInt(numWindows);
-		for (Context context: windows.values()) {
-			context.writeToState(out);
-		}
-
-		taskState.setOperatorState(out.closeAndGetHandle());
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
-
-
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(getUserCodeClassloader());
-
-		int numWindows = in.readInt();
-		this.windows = new HashMap<>(numWindows);
-		this.processingTimeTimers = new HashMap<>();
-		this.watermarkTimers = new HashMap<>();
-
-		for (int j = 0; j < numWindows; j++) {
-			Context context = new Context(in);
-			windows.put(context.window, context);
-		}
-	}
-
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public boolean isSetProcessingTime() {
-		return setProcessingTime;
-	}
-
-	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTrigger() {
-		return trigger;
-	}
-
-	@VisibleForTesting
-	public WindowAssigner<? super IN, W> getWindowAssigner() {
-		return windowAssigner;
-	}
-
-	@VisibleForTesting
-	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
-		return windowBufferFactory;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
deleted file mode 100644
index 2491c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
- * {@link Trigger}.
- *
- * <p>
- * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
- * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and same
- * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
- * {@code WindowAssigner}.
- *
- * <p>
- * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
- * the contents of the pane should be processed to emit results. When a trigger fires,
- * the given {@link WindowFunction} is invoked to produce the results that are emitted for
- * the pane to which the {@code Trigger} belongs.
- *
- * <p>
- * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
- * elements of each pane.
- *
- * @param <K> The type of key returned by the {@code KeySelector}.
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class WindowOperator<K, IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
-	// ------------------------------------------------------------------------
-	// Configuration values and user functions
-	// ------------------------------------------------------------------------
-
-	private final WindowAssigner<? super IN, W> windowAssigner;
-
-	private final KeySelector<IN, K> keySelector;
-
-	private final Trigger<? super IN, ? super W> trigger;
-
-	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
-
-	/**
-	 * If this is true. The current processing time is set as the timestamp of incoming elements.
-	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
-	 * if eviction should happen based on processing time.
-	 */
-	private boolean setProcessingTime = false;
-
-	/**
-	 * This is used to copy the incoming element because it can be put into several window
-	 * buffers.
-	 */
-	private TypeSerializer<IN> inputSerializer;
-
-	/**
-	 * For serializing the key in checkpoints.
-	 */
-	private final TypeSerializer<K> keySerializer;
-
-	/**
-	 * For serializing the window in checkpoints.
-	 */
-	private final TypeSerializer<W> windowSerializer;
-
-	// ------------------------------------------------------------------------
-	// State that is not checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Processing time timers that are currently in-flight.
-	 */
-	private transient Map<Long, Set<Context>> processingTimeTimers;
-
-	/**
-	 * Current waiting watermark callbacks.
-	 */
-	private transient Map<Long, Set<Context>> watermarkTimers;
-
-	/**
-	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
-	 */
-	protected transient TimestampedCollector<OUT> timestampedCollector;
-
-	// ------------------------------------------------------------------------
-	// State that needs to be checkpointed
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
-	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
-	 */
-	protected transient Map<K, Map<W, Context>> windows;
-
-	/**
-	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
-	 */
-	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			KeySelector<IN, K> keySelector,
-			TypeSerializer<K> keySerializer,
-			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			WindowFunction<IN, OUT, K, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger) {
-
-		super(windowFunction);
-
-		this.windowAssigner = requireNonNull(windowAssigner);
-		this.windowSerializer = windowSerializer;
-		this.keySelector = requireNonNull(keySelector);
-		this.keySerializer = requireNonNull(keySerializer);
-
-		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.trigger = requireNonNull(trigger);
-
-		setChainingStrategy(ChainingStrategy.ALWAYS);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
-	}
-
-	@Override
-	public final void open() throws Exception {
-		super.open();
-
-		timestampedCollector = new TimestampedCollector<>(output);
-
-		if (inputSerializer == null) {
-			throw new IllegalStateException("Input serializer was not set.");
-		}
-
-		windowBufferFactory.setRuntimeContext(getRuntimeContext());
-		windowBufferFactory.open(getUserFunctionParameters());
-
-
-		// these could already be initialized from restoreState()
-		if (watermarkTimers == null) {
-			watermarkTimers = new HashMap<>();
-		}
-		if (processingTimeTimers == null) {
-			processingTimeTimers = new HashMap<>();
-		}
-		if (windows == null) {
-			windows = new HashMap<>();
-		}
-
-		// re-register timers that this window context had set
-		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
-			Map<W, Context> keyWindows = entry.getValue();
-			for (Context context: keyWindows.values()) {
-				if (context.processingTimeTimer > 0) {
-					Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
-					if (triggers == null) {
-						getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this);
-						triggers = new HashSet<>();
-						processingTimeTimers.put(context.processingTimeTimer, triggers);
-					}
-					triggers.add(context);
-				}
-				if (context.watermarkTimer > 0) {
-					Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
-					if (triggers == null) {
-						triggers = new HashSet<>();
-						watermarkTimers.put(context.watermarkTimer, triggers);
-					}
-					triggers.add(context);
-				}
-
-			}
-		}
-	}
-
-	@Override
-	public final void close() throws Exception {
-		super.close();
-		// emit the elements that we still keep
-		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
-			Map<W, Context> keyWindows = entry.getValue();
-			for (Context window: keyWindows.values()) {
-				emitWindow(window);
-			}
-		}
-		windows.clear();
-		windowBufferFactory.close();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public final void processElement(StreamRecord<IN> element) throws Exception {
-		if (setProcessingTime) {
-			element.replace(element.getValue(), System.currentTimeMillis());
-		}
-
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
-		K key = keySelector.getKey(element.getValue());
-
-		Map<W, Context> keyWindows = windows.get(key);
-		if (keyWindows == null) {
-			keyWindows = new HashMap<>();
-			windows.put(key, keyWindows);
-		}
-
-		for (W window: elementWindows) {
-			Context context = keyWindows.get(window);
-			if (context == null) {
-				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
-				context = new Context(key, window, windowBuffer);
-				keyWindows.put(window, context);
-			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
-			processTriggerResult(triggerResult, key, window);
-		}
-	}
-
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setTimestamp(context.window.maxTimestamp());
-
-		userFunction.apply(context.key,
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
-	}
-
-	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				if (keyWindows.isEmpty()) {
-					windows.remove(key);
-				}
-
-				emitWindow(context);
-				break;
-			}
-
-			case CONTINUE:
-				// ingore
-		}
-	}
-
-	@Override
-	public final void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
-			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.key, context.window);
-				}
-				toRemove.add(triggers.getKey());
-			}
-		}
-
-		for (Long l: toRemove) {
-			watermarkTimers.remove(l);
-		}
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public final void trigger(long time) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			if (triggers.getKey() < time) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
-					processTriggerResult(triggerResult, context.key, context.window);
-				}
-				toRemove.add(triggers.getKey());
-			}
-		}
-
-		for (Long l: toRemove) {
-			processingTimeTimers.remove(l);
-		}
-	}
-
-	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
-	 */
-	protected class Context implements Trigger.TriggerContext {
-		protected K key;
-		protected W window;
-
-		protected WindowBuffer<IN> windowBuffer;
-
-		protected HashMap<String, Serializable> state;
-
-		// use these to only allow one timer in flight at a time of each type
-		// if the trigger registers another timer this value here will be overwritten,
-		// the timer is not removed from the set of in-flight timers to improve performance.
-		// When a trigger fires it is just checked against the last timer that was set.
-		protected long watermarkTimer;
-		protected long processingTimeTimer;
-
-		public Context(K key,
-				W window,
-				WindowBuffer<IN> windowBuffer) {
-			this.key = key;
-			this.window = window;
-			this.windowBuffer = windowBuffer;
-			state = new HashMap<>();
-
-			this.watermarkTimer = -1;
-			this.processingTimeTimer = -1;
-		}
-
-		/**
-		 * Constructs a new {@code Context} by reading from a {@link DataInputView} that
-		 * contains a serialized context that we wrote in
-		 * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
-		 */
-		@SuppressWarnings("unchecked")
-		protected Context(DataInputView in) throws Exception {
-			this.key = keySerializer.deserialize(in);
-			this.window = windowSerializer.deserialize(in);
-			this.watermarkTimer = in.readLong();
-			this.processingTimeTimer = in.readLong();
-
-			int stateSize = in.readInt();
-			byte[] stateData = new byte[stateSize];
-			in.read(stateData);
-			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
-			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
-
-			this.windowBuffer = windowBufferFactory.create();
-			int numElements = in.readInt();
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			for (int i = 0; i < numElements; i++) {
-				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
-			}
-		}
-
-		/**
-		 * Writes the {@code Context} to the given state checkpoint output.
-		 */
-		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
-			keySerializer.serialize(key, out);
-			windowSerializer.serialize(window, out);
-			out.writeLong(watermarkTimer);
-			out.writeLong(processingTimeTimer);
-
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			SerializationUtils.serialize(state, baos);
-			out.writeInt(baos.size());
-			out.write(baos.toByteArray(), 0, baos.size());
-
-			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			out.writeInt(windowBuffer.size());
-			for (StreamRecord<IN> element: windowBuffer.getElements()) {
-				recordSerializer.serialize(element, out);
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
-			return new OperatorState<S>() {
-				@Override
-				public S value() throws IOException {
-					Serializable value = state.get(name);
-					if (value == null) {
-						state.put(name, defaultState);
-						value = defaultState;
-					}
-					return (S) value;
-				}
-
-				@Override
-				public void update(S value) throws IOException {
-					state.put(name, value);
-				}
-			};
-		}
-
-		@Override
-		public void registerProcessingTimeTimer(long time) {
-			if (this.processingTimeTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = processingTimeTimers.get(time);
-			if (triggers == null) {
-				getRuntimeContext().registerTimer(time, WindowOperator.this);
-				triggers = new HashSet<>();
-				processingTimeTimers.put(time, triggers);
-			}
-			this.processingTimeTimer = time;
-			triggers.add(this);
-		}
-
-		@Override
-		public void registerEventTimeTimer(long time) {
-			if (watermarkTimer == time) {
-				// we already have set a trigger for that time
-				return;
-			}
-			Set<Context> triggers = watermarkTimers.get(time);
-			if (triggers == null) {
-				triggers = new HashSet<>();
-				watermarkTimers.put(time, triggers);
-			}
-			this.watermarkTimer = time;
-			triggers.add(this);
-		}
-
-		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
-			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
-			} else {
-				return Trigger.TriggerResult.CONTINUE;
-			}
-		}
-
-		public Trigger.TriggerResult onEventTime(long time) throws Exception {
-			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
-			} else {
-				return Trigger.TriggerResult.CONTINUE;
-			}
-		}
-	}
-
-	/**
-	 * When this flag is enabled the current processing time is set as the timestamp of elements
-	 * upon arrival. This must be used, for example, when using the
-	 * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
-	 * time semantics.
-	 */
-	public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
-		this.setProcessingTime = setProcessingTime;
-		return this;
-	}
-
-	@Override
-	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		if (userFunction instanceof OutputTypeConfigurable) {
-			@SuppressWarnings("unchecked")
-			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
-			typeConfigurable.setOutputType(outTypeInfo, executionConfig);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-		// we write the panes with the key/value maps into the stream
-		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-		int numKeys = windows.size();
-		out.writeInt(numKeys);
-
-		for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) {
-			int numWindows = keyWindows.getValue().size();
-			out.writeInt(numWindows);
-			for (Context context: keyWindows.getValue().values()) {
-				context.writeToState(out);
-			}
-		}
-
-		taskState.setOperatorState(out.closeAndGetHandle());
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
-
-
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(getUserCodeClassloader());
-
-		int numKeys = in.readInt();
-		this.windows = new HashMap<>(numKeys);
-		this.processingTimeTimers = new HashMap<>();
-		this.watermarkTimers = new HashMap<>();
-
-		for (int i = 0; i < numKeys; i++) {
-			int numWindows = in.readInt();
-			for (int j = 0; j < numWindows; j++) {
-				Context context = new Context(in);
-				Map<W, Context> keyWindows = windows.get(context.key);
-				if (keyWindows == null) {
-					keyWindows = new HashMap<>(numWindows);
-					windows.put(context.key, keyWindows);
-				}
-				keyWindows.put(context.window, context);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public boolean isSetProcessingTime() {
-		return setProcessingTime;
-	}
-
-	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTrigger() {
-		return trigger;
-	}
-
-	@VisibleForTesting
-	public KeySelector<IN, K> getKeySelector() {
-		return keySelector;
-	}
-
-	@VisibleForTesting
-	public WindowAssigner<? super IN, W> getWindowAssigner() {
-		return windowAssigner;
-	}
-
-	@VisibleForTesting
-	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
-		return windowBufferFactory;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
deleted file mode 100644
index 28365e1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-/**
- * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
- * the elements are added is preserved. Elements can only be evicted started from the beginning of
- * the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-
-public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
-
-	/**
-	 * Removes the given number of elements, starting from the beginning.
-	 * @param count The number of elements to remove.
-	 */
-	void removeElements(int count);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
deleted file mode 100644
index f9f8b26..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
-	private static final long serialVersionUID = 1L;
-
-	private ArrayDeque<StreamRecord<T>> elements;
-
-	protected HeapWindowBuffer() {
-		this.elements = new ArrayDeque<>();
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) {
-		elements.add(element);
-	}
-
-	@Override
-	public void removeElements(int count) {
-		// TODO determine if this can be done in a better way
-		for (int i = 0; i < count; i++) {
-			elements.removeFirst();
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return elements;
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
-			@Override
-			public T apply(StreamRecord<T> record) {
-				return record.getValue();
-			}
-		});
-	}
-
-	@Override
-	public int size() {
-		return elements.size();
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void setRuntimeContext(RuntimeContext ctx) {}
-
-		@Override
-		public void open(Configuration config) {}
-
-		@Override
-		public void close() {}
-
-		@Override
-		public HeapWindowBuffer<T> create() {
-			return new HeapWindowBuffer<>();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
deleted file mode 100644
index 37be8f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-
-public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
-	private static final long serialVersionUID = 1L;
-
-	private final ReduceFunction<T> reduceFunction;
-	private transient StreamRecord<T> data;
-
-	protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void storeElement(StreamRecord<T> element) throws Exception {
-		if (data == null) {
-			data = new StreamRecord<>(element.getValue(), element.getTimestamp());
-		} else {
-			data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
-		}
-	}
-
-	@Override
-	public Iterable<StreamRecord<T>> getElements() {
-		return Collections.singleton(data);
-	}
-
-	@Override
-	public Iterable<T> getUnpackedElements() {
-		return Collections.singleton(data.getValue());
-	}
-
-	@Override
-	public int size() {
-		return 1;
-	}
-
-	public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
-		private static final long serialVersionUID = 1L;
-
-		private final ReduceFunction<T> reduceFunction;
-
-		public Factory(ReduceFunction<T> reduceFunction) {
-			this.reduceFunction = reduceFunction;
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext ctx) {
-			FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-		}
-
-		@Override
-		public void open(Configuration config) throws Exception {
-			FunctionUtils.openFunction(reduceFunction, config);
-		}
-
-		@Override
-		public void close() throws Exception {
-			FunctionUtils.closeFunction(reduceFunction);
-		}
-
-		@Override
-		public PreAggregatingHeapWindowBuffer<T> create() {
-			return new PreAggregatingHeapWindowBuffer<>(reduceFunction);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
deleted file mode 100644
index b111667..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.Serializable;
-
-/**
- * A {@code WindowBuffer} is used by
- * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
- * the elements of one pane.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-public interface WindowBuffer<T> extends Serializable {
-
-	/**
-	 * Adds the element to the buffer.
-	 *
-	 * @param element The element to add.
-	 */
-	void storeElement(StreamRecord<T> element) throws Exception;
-
-	/**
-	 * Returns all elements that are currently in the buffer.
-	 */
-	Iterable<StreamRecord<T>> getElements();
-
-	/**
-	 * Returns all elements that are currently in the buffer. This will unwrap the contained
-	 * elements from their {@link StreamRecord}.
-	 */
-	Iterable<T> getUnpackedElements();
-
-	/**
-	 * Returns the number of elements that are currently in the buffer.
-	 */
-	int size();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
deleted file mode 100644
index 4bcdf09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.Serializable;
-
-/**
- * A factory for {@link WindowBuffer WindowBuffers}.
- *
- * @param <T> The type of elements that the created {@code WindowBuffer} can store.
- * @param <B> The type of the created {@code WindowBuffer}
- */
-public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
-
-	/**
-	 * Sets the {@link RuntimeContext} that is used to initialize eventual user functions
-	 * inside the created buffers.
-	 */
-	void setRuntimeContext(RuntimeContext ctx);
-
-	/**
-	 * Calls {@code open()} on eventual user functions inside the buffer.
-	 */
-	void open(Configuration config) throws Exception;
-
-	/**
-	 * Calls {@code close()} on eventual user functions inside the buffer.
-	 */
-
-	void close() throws Exception;
-
-	/**
-	 * Creates a new {@code WindowBuffer}.
-	 */
-	B create();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
deleted file mode 100644
index 55749a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the operators that implement the various window operations
- * on data streams. 
- */
-package org.apache.flink.streaming.runtime.operators.windowing;

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
deleted file mode 100644
index f3d851c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects all the output channels.
- *
- * @param <T> Type of the elements in the Stream being broadcast
- */
-public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	int[] returnArray;
-	boolean set;
-	int setNumber;
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		if (set && setNumber == numberOfOutputChannels) {
-			return returnArray;
-		} else {
-			this.returnArray = new int[numberOfOutputChannels];
-			for (int i = 0; i < numberOfOutputChannels; i++) {
-				returnArray[i] = i;
-			}
-			set = true;
-			setNumber = numberOfOutputChannels;
-			return returnArray;
-		}
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return "BROADCAST";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
deleted file mode 100644
index 7bb9480..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the channel with a user defined partitioner function on a key.
- *
- * @param <K>
- *            Type of the key
- * @param <T>
- *            Type of the data
- */
-public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[1];
-	Partitioner<K> partitioner;
-	KeySelector<T, K> keySelector;
-
-	public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
-		this.partitioner = partitioner;
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
-
-		K key = null;
-		try {
-			key = keySelector.getKey(record.getInstance().getValue());
-		} catch (Exception e) {
-			throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
-		}
-
-		returnArray[0] = partitioner.partition(key,
-				numberOfOutputChannels);
-
-		return returnArray;
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return "CUSTOM";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
deleted file mode 100644
index 4fb460c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that forwards elements only to the locally running downstream operation.
- * 
- * @param <T> Type of the elements in the Stream
- */
-public class ForwardPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] {0};
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
-		return returnArray;
-	}
-	
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-	
-	@Override
-	public String toString() {
-		return "FORWARD";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
deleted file mode 100644
index b19fb41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that sends all elements to the downstream operator with subtask ID=0;
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-public class GlobalPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] { 0 };
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		return returnArray;
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return "GLOBAL";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
deleted file mode 100644
index a3f5158..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner selects the target channel based on the hash value of a key from a
- * {@link KeySelector}.
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-public class HashPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[1];
-	KeySelector<T, ?> keySelector;
-
-	public HashPartitioner(KeySelector<T, ?> keySelector) {
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		Object key;
-		try {
-			key = keySelector.getKey(record.getInstance().getValue());
-		} catch (Exception e) {
-			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
-		}
-		returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
-
-		return returnArray;
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return "HASH";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
deleted file mode 100644
index 2dfff0e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- * 
- * @param <T> Type of the elements in the Stream being rebalanced
- */
-public class RebalancePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] {-1};
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-		return this.returnArray;
-	}
-	
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-	
-	@Override
-	public String toString() {
-		return "REBALANCE";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
deleted file mode 100644
index 93c6f9c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import java.util.Random;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by selecting one output channel
- * randomly.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class ShufflePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Random random = new Random();
-
-	private int[] returnArray = new int[1];
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		returnArray[0] = random.nextInt(numberOfOutputChannels);
-		return returnArray;
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return new ShufflePartitioner<T>();
-	}
-
-	@Override
-	public String toString() {
-		return "SHUFFLE";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
deleted file mode 100644
index 4ef360d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.partitioner;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public abstract class StreamPartitioner<T> implements
-		ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
-	private static final long serialVersionUID = 1L;
-
-	public abstract StreamPartitioner<T> copy();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index d4363cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link StreamRecord} and {@link org.apache.flink.streaming.api.watermark.Watermark}. This does not behave like a normal
- * {@link TypeSerializer}, instead, this is only used at the
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
- * {@link StreamRecord StreamRecords} and {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. This serializer
- * can handle both of them, therefore it returns {@link Object} the result has
- * to be cast to the correct type.
- *
- * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
- */
-public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final long IS_WATERMARK = Long.MIN_VALUE;
-	
-	private final TypeSerializer<T> typeSerializer;
-
-	
-	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
-		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
-			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-		}
-		this.typeSerializer = Preconditions.checkNotNull(serializer);
-	}
-	
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<StreamElement> duplicate() {
-		TypeSerializer<T> copy = typeSerializer.duplicate();
-		return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
-	}
-
-	@Override
-	public StreamRecord<T> createInstance() {
-		return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
-	}
-
-	@Override
-	public StreamElement copy(StreamElement from) {
-		// we can reuse the timestamp since Instant is immutable
-		if (from.isRecord()) {
-			StreamRecord<T> fromRecord = from.asRecord();
-			return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
-		}
-		else if (from.isWatermark()) {
-			// is immutable
-			return from;
-		}
-		else {
-			throw new RuntimeException("Cannot copy " + from);
-		}
-	}
-
-	@Override
-	public StreamElement copy(StreamElement from, StreamElement reuse) {
-		if (from.isRecord() && reuse.isRecord()) {
-			StreamRecord<T> fromRecord = from.asRecord();
-			StreamRecord<T> reuseRecord = reuse.asRecord();
-
-			T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
-			reuseRecord.replace(valueCopy, fromRecord.getTimestamp());
-			return reuse;
-		}
-		else if (from.isWatermark()) {
-			// is immutable
-			return from;
-		}
-		else {
-			throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
-		}
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(StreamElement value, DataOutputView target) throws IOException {
-		if (value.isRecord()) {
-			StreamRecord<T> record = value.asRecord();
-			target.writeLong(record.getTimestamp());
-			typeSerializer.serialize(record.getValue(), target);
-		}
-		else if (value.isWatermark()) {
-			target.writeLong(IS_WATERMARK);
-			target.writeLong(value.asWatermark().getTimestamp());
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-	
-	@Override
-	public StreamElement deserialize(DataInputView source) throws IOException {
-		long millis = source.readLong();
-
-		if (millis == IS_WATERMARK) {
-			return new Watermark(source.readLong());
-		}
-		else {
-			T element = typeSerializer.deserialize(source);
-			return new StreamRecord<T>(element, millis);
-		}
-	}
-
-	@Override
-	public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
-		long millis = source.readLong();
-
-		if (millis == IS_WATERMARK) {
-			return new Watermark(source.readLong());
-		}
-		else {
-			StreamRecord<T> reuseRecord = reuse.asRecord();
-			T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
-			reuseRecord.replace(element, millis);
-			return reuse;
-		}
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		long millis = source.readLong();
-		target.writeLong(millis);
-
-		if (millis == IS_WATERMARK) {
-			target.writeLong(source.readLong());
-		} else {
-			typeSerializer.copy(source, target);
-		}
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof MultiplexingStreamRecordSerializer) {
-			MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
-
-			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof MultiplexingStreamRecordSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return typeSerializer.hashCode();
-	}
-}