You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:51 UTC

[35/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
deleted file mode 100644
index fbecbd1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link StreamOperator} for streaming sources.
- */
-public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction<T>> implements StreamOperator<T> {
-
-	private static final long serialVersionUID = 1L;
-	private transient SourceFunction.SourceContext<T> ctx;
-
-	public StreamSource(SourceFunction<T> sourceFunction) {
-		super(sourceFunction);
-
-		this.chainingStrategy = ChainingStrategy.HEAD;
-	}
-
-	public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
-		final ExecutionConfig executionConfig = getExecutionConfig();
-		
-		if (userFunction instanceof EventTimeSourceFunction) {
-			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
-		} else if (executionConfig.getAutoWatermarkInterval() > 0) {
-			ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
-		} else if (executionConfig.areTimestampsEnabled()) {
-			ctx = new NonWatermarkContext<T>(lockingObject, collector);
-		} else {
-			ctx = new NonTimestampContext<T>(lockingObject, collector);
-		}
-
-		userFunction.run(ctx);
-
-		// This will mostly emit a final +Inf Watermark to make the Watermark logic work
-		// when some sources finish before others do
-		ctx.close();
-	}
-
-	public void cancel() {
-		userFunction.cancel();
-		ctx.close();
-	}
-
-	/**
-	 * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
-	 * In addition to {@link NonWatermarkContext} this will also not attach timestamps to sources.
-	 * (Technically it will always set the timestamp to 0).
-	 */
-	public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		public NonTimestampContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
-			this.lockingObject = lockingObjectParam;
-			this.output = outputParam;
-			this.reuse = new StreamRecord<T>(null);
-		}
-
-		@Override
-		public void collect(T element) {
-			output.collect(reuse.replace(element, 0));
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	/**
-	 * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
-	 */
-	public static class NonWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		public NonWatermarkContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
-			this.lockingObject = lockingObjectParam;
-			this.output = outputParam;
-			this.reuse = new StreamRecord<T>(null);
-		}
-
-		@Override
-		public void collect(T element) {
-			long currentTime = System.currentTimeMillis();
-			output.collect(reuse.replace(element, currentTime));
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	/**
-	 * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
-	 * and watermark emission.
-	 */
-	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final ScheduledExecutorService scheduleExecutor;
-		private final ScheduledFuture<?> watermarkTimer;
-		private final long watermarkInterval;
-
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		private volatile long lastWatermarkTime;
-
-		public AutomaticWatermarkContext(Object lockingObjectParam,
-				Output<StreamRecord<T>> outputParam,
-				ExecutionConfig executionConfig) {
-			this.lockingObject = lockingObjectParam;
-			this.output = outputParam;
-			this.reuse = new StreamRecord<T>(null);
-
-			watermarkInterval = executionConfig.getAutoWatermarkInterval();
-
-			scheduleExecutor = Executors.newScheduledThreadPool(1);
-
-			watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
-				@Override
-				public void run() {
-					long currentTime = System.currentTimeMillis();
-					// align the watermarks across all machines. this will ensure that we
-					// don't have watermarks that creep along at different intervals because
-					// the machine clocks are out of sync
-					long watermarkTime = currentTime - (currentTime % watermarkInterval);
-					if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
-						synchronized (lockingObject) {
-							if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
-								output.emitWatermark(new Watermark(watermarkTime));
-								lastWatermarkTime = watermarkTime;
-							}
-						}
-					}
-				}
-			}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
-
-		}
-
-		@Override
-		public void collect(T element) {
-			synchronized (lockingObject) {
-				long currentTime = System.currentTimeMillis();
-				output.collect(reuse.replace(element, currentTime));
-
-				long watermarkTime = currentTime - (currentTime % watermarkInterval);
-				if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
-					output.emitWatermark(new Watermark(watermarkTime));
-					lastWatermarkTime = watermarkTime;
-				}
-			}
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
-					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
-					" if you want to manually assign timestamps to elements.");
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {
-			watermarkTimer.cancel(true);
-			scheduleExecutor.shutdownNow();
-			// emit one last +Inf watermark to make downstream watermark processing work
-			// when some sources close early
-			synchronized (lockingObject) {
-				output.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
-		}
-	}
-
-	/**
-	 * {@link SourceFunction.SourceContext} to be used for sources with manual timestamp
-	 * assignment and manual watermark emission.
-	 */
-	public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) {
-			this.lockingObject = lockingObject;
-			this.output = output;
-			this.reuse = new StreamRecord<T>(null);
-		}
-
-		@Override
-		public void collect(T element) {
-			throw new UnsupportedOperationException("Manual-Timestamp sources can only emit" +
-					" elements with a timestamp. Please use collectWithTimestamp().");
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			synchronized (lockingObject) {
-				output.collect(reuse.replace(element, timestamp));
-			}
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {
-			// emit one last +Inf watermark to make downstream watermark processing work
-			// when some sources close early
-			synchronized (lockingObject) {
-				output.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
deleted file mode 100644
index 87a9abd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
- * for streaming operators.
- */
-public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
-
-	/** The operator to which this function belongs */
-	private final AbstractStreamOperator<?> operator;
-	
-	/** The task environment running the operator */
-	private final Environment taskEnvironment;
-	
-	/** The key/value state, if the user-function requests it */
-	private HashMap<String, OperatorState<?>> keyValueStates;
-	
-	/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
-	private HashMap<String, TypeInformation<?>> stateTypeInfos;
-	
-	
-	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
-									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
-		super(env.getTaskName(),
-				env.getNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(),
-				env.getUserClassLoader(),
-				operator.getExecutionConfig(),
-				accumulators,
-				env.getDistributedCacheEntries());
-		
-		this.operator = operator;
-		this.taskEnvironment = env;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Returns the input split provider associated with the operator.
-	 * 
-	 * @return The input split provider.
-	 */
-	public InputSplitProvider getInputSplitProvider() {
-		return taskEnvironment.getInputSplitProvider();
-	}
-
-	/**
-	 * Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
-	 * This call is guaranteed to not happen concurrently with method calls on the operator.
-	 *
-	 * @param time The absolute time in milliseconds.
-	 * @param target The target to be triggered.
-	 */
-	public void registerTimer(long time, Triggerable target) {
-		operator.registerTimer(time, target);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  broadcast variables
-	// ------------------------------------------------------------------------
-
-	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
-	}
-
-	@Override
-	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
-	}
-
-	// ------------------------------------------------------------------------
-	//  key/value state
-	// ------------------------------------------------------------------------
-
-	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
-		requireNonNull(stateType, "The state type class must not be null");
-
-		TypeInformation<S> typeInfo;
-		try {
-			typeInfo = TypeExtractor.getForClass(stateType);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot analyze type '" + stateType.getName() + 
-					"' from the class alone, due to generic type parameters. " +
-					"Please specify the TypeInformation directly.", e);
-		}
-		
-		return getKeyValueState(name, typeInfo, defaultState);
-	}
-
-	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
-		requireNonNull(name, "The name of the state must not be null");
-		requireNonNull(stateType, "The state type information must not be null");
-		
-		OperatorState<?> previousState;
-		
-		// check if this is a repeated call to access the state 
-		if (this.stateTypeInfos != null && this.keyValueStates != null &&
-				(previousState = this.keyValueStates.get(name)) != null) {
-			
-			// repeated call
-			TypeInformation<?> previousType;
-			if (stateType.equals((previousType = this.stateTypeInfos.get(name)))) {
-				// valid case, same type requested again
-				@SuppressWarnings("unchecked")
-				OperatorState<S> previous = (OperatorState<S>) previousState;
-				return previous;
-			}
-			else {
-				// invalid case, different type requested this time
-				throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
-						" ; The key/value state has already been created and initialized for a different type: " +
-						previousType);
-			}
-		}
-		else {
-			// first time access to the key/value state
-			if (this.stateTypeInfos == null) {
-				this.stateTypeInfos = new HashMap<>();
-			}
-			if (this.keyValueStates == null) {
-				this.keyValueStates = new HashMap<>();
-			}
-			
-			try {
-				OperatorState<S> state = operator.createKeyValueState(name, stateType, defaultState);
-				this.keyValueStates.put(name, state);
-				this.stateTypeInfos.put(name, stateType);
-				return state;
-			}
-			catch (RuntimeException e) {
-				throw e;
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Cannot initialize the key/value state", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
deleted file mode 100644
index 62514fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
- * Before giving the {@link TimestampedCollector} to a user function you must set
- * the {@link Instant timestamp} that should be attached to emitted elements. Most operators
- * would set the {@link Instant timestamp} of the incoming {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
- *
- * @param <T> The type of the elments that can be emitted.
- */
-public class TimestampedCollector<T> implements Collector<T> {
-	
-	private final Output<StreamRecord<T>> output;
-
-	private final StreamRecord<T> reuse;
-	
-	private long timestamp;
-	
-
-	/**
-	 * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
-	 */
-	public TimestampedCollector(Output<StreamRecord<T>> output) {
-		this.output = output;
-		this.reuse = new StreamRecord<T>(null);
-	}
-
-	@Override
-	public void collect(T record) {
-		output.collect(reuse.replace(record, timestamp));
-	}
-
-	/**
-	 * Sets the {@link Instant timestamp} that is attached to elements that get emitted using
-	 * {@link #collect}
-	 * @param timestamp The timestamp in milliseconds
-	 */
-	public void setTimestamp(long timestamp) {
-		this.timestamp = timestamp;
-	}
-
-	@Override
-	public void close() {
-		output.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
deleted file mode 100644
index afc6d1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Interface for stream operators with two inputs. Use
- * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
- * you want to implement a custom operator.
- * 
- * @param <IN1> The input type of the operator
- * @param <IN2> The input type of the operator
- * @param <OUT> The output type of the operator
- */
-public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
-
-	/**
-	 * Processes one element that arrived on the first input of this two-input operator.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 */
-	public void processElement1(StreamRecord<IN1> element) throws Exception;
-
-	/**
-	 * Processes one element that arrived on the second input of this two-input operator.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 */
-	public void processElement2(StreamRecord<IN2> element) throws Exception;
-
-	/**
-	 * Processes a {@link Watermark} that arrived on the first input of this two-input operator.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 *
-	 * @see org.apache.flink.streaming.api.watermark.Watermark
-	 */
-	public void processWatermark1(Watermark mark) throws Exception;
-
-	/**
-	 * Processes a {@link Watermark} that arrived on the second input of this two-input operator.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 *
-	 * @see org.apache.flink.streaming.api.watermark.Watermark
-	 */
-	public void processWatermark2(Watermark mark) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
deleted file mode 100644
index 806cef2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamFlatMap<IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
-	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
-		super(flatMapper);
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<OUT>(output);
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		collector.setTimestamp(element.getTimestamp());
-		userFunction.flatMap1(element.getValue(), collector);
-
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		collector.setTimestamp(element.getTimestamp());
-		userFunction.flatMap2(element.getValue(), collector);
-	}
-
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	protected TimestampedCollector<OUT> getCollector() {
-		return collector;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
deleted file mode 100644
index e34e727..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamMap<IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
-	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
-		super(mapper);
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		output.collect(element.replace(userFunction.map1(element.getValue())));
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		output.collect(element.replace(userFunction.map2(element.getValue())));
-	}
-
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
deleted file mode 100644
index f9c95f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This represents a feedback point in a topology. The type of the feedback elements must not match
- * the type of the upstream {@code StreamTransformation} because the only allowed operations
- * after a {@code CoFeedbackTransformation} are
- * {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.
- * The upstream {@code StreamTransformation} will be connected to the first input of the Co-Transform
- * while the feedback edges will be connected to the second input.
- *
- * <p>
- * Both the partitioning of the input and the feedback edges is preserved. They can also have
- * differing partitioning strategies. This requires, however, that the parallelism of the feedback
- * {@code StreamTransformations} must match the parallelism of the input
- * {@code StreamTransformation}.
- *
- * <p>
- * The upstream {@code StreamTransformation} is not wired to this {@code CoFeedbackTransformation}.
- * It is instead directly wired to the {@code TwoInputTransformation} after this
- * {@code CoFeedbackTransformation}.
- *
- * <p>
- * This is different from Iterations in batch processing.
- * @see org.apache.flink.streaming.api.transformations.FeedbackTransformation
- *
- * @param <F> The type of the feedback elements.
- *
- */
-public class CoFeedbackTransformation<F> extends StreamTransformation<F> {
-
-	private final List<StreamTransformation<F>> feedbackEdges;
-
-	private final Long waitTime;
-
-	/**
-	 * Creates a new {@code CoFeedbackTransformation} from the given input.
-	 *
-	 * @param parallelism The parallelism of the upstream {@code StreamTransformatino} and the
-	 *                    feedback edges.
-	 * @param feedbackType The type of the feedback edges
-	 * @param waitTime The wait time of the feedback operator. After the time expires
-	 *                          the operation will close and not receive any more feedback elements.
-	 */
-	public CoFeedbackTransformation(int parallelism,
-			TypeInformation<F> feedbackType,
-			Long waitTime) {
-		super("CoFeedback", feedbackType, parallelism);
-		this.waitTime = waitTime;
-		this.feedbackEdges = Lists.newArrayList();
-	}
-
-	/**
-	 * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
-	 * the parallelism of the input {@code StreamTransformation} of the upstream
-	 * {@code StreamTransformation}.
-	 *
-	 * @param transform The new feedback {@code StreamTransformation}.
-	 */
-	public void addFeedbackEdge(StreamTransformation<F> transform) {
-
-		if (transform.getParallelism() != this.getParallelism()) {
-			throw new UnsupportedOperationException(
-					"Parallelism of the feedback stream must match the parallelism of the original" +
-							" stream. Parallelism of original stream: " + this.getParallelism() +
-							"; parallelism of feedback stream: " + transform.getParallelism());
-		}
-
-		feedbackEdges.add(transform);
-	}
-
-	/**
-	 * Returns the list of feedback {@code StreamTransformations}.
-	 */
-	public List<StreamTransformation<F>> getFeedbackEdges() {
-		return feedbackEdges;
-	}
-
-	/**
-	 * Returns the wait time. This is the amount of time that the feedback operator keeps listening
-	 * for feedback elements. Once the time expires the operation will close and will not receive
-	 * further elements.
-	 */
-	public Long getWaitTime() {
-		return waitTime;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		return Collections.<StreamTransformation<?>>singleton(this);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
deleted file mode 100644
index 87c7f16..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This represents a feedback point in a topology.
- *
- * <p>
- * This is different from how iterations work in batch processing. Once a feedback point is defined
- * you can connect one or several {@code StreamTransformations} as a feedback edges. Operations
- * downstream from the feedback point will receive elements from the input of this feedback point
- * and from the feedback edges.
- *
- * <p>
- * Both the partitioning of the input and the feedback edges is preserved. They can also have
- * differing partitioning strategies. This requires, however, that the parallelism of the feedback
- * {@code StreamTransformations} must match the parallelism of the input
- * {@code StreamTransformation}.
- *
- * <p>
- * The type of the input {@code StreamTransformation} and the feedback {@code StreamTransformation}
- * must match.
- *
- * @param <T> The type of the input elements and the feedback elements.
- */
-public class FeedbackTransformation<T> extends StreamTransformation<T> {
-
-	private final StreamTransformation<T> input;
-
-	private final List<StreamTransformation<T>> feedbackEdges;
-
-	private final Long waitTime;
-
-	/**
-	 * Creates a new {@code FeedbackTransformation} from the given input.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param waitTime The wait time of the feedback operator. After the time expires
-	 *                          the operation will close and not receive any more feedback elements.
-	 */
-	public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
-		super("Feedback", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.waitTime = waitTime;
-		this.feedbackEdges = Lists.newArrayList();
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation} of this {@code FeedbackTransformation}.
-	 */
-	public StreamTransformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
-	 * the parallelism of the input {@code StreamTransformation} of this
-	 * {@code FeedbackTransformation}
-	 *
-	 * @param transform The new feedback {@code StreamTransformation}.
-	 */
-	public void addFeedbackEdge(StreamTransformation<T> transform) {
-
-		if (transform.getParallelism() != this.getParallelism()) {
-			throw new UnsupportedOperationException(
-					"Parallelism of the feedback stream must match the parallelism of the original" +
-							" stream. Parallelism of original stream: " + this.getParallelism() +
-							"; parallelism of feedback stream: " + transform.getParallelism());
-		}
-
-		feedbackEdges.add(transform);
-	}
-
-	/**
-	 * Returns the list of feedback {@code StreamTransformations}.
-	 */
-	public List<StreamTransformation<T>> getFeedbackEdges() {
-		return feedbackEdges;
-	}
-
-	/**
-	 * Returns the wait time. This is the amount of time that the feedback operator keeps listening
-	 * for feedback elements. Once the time expires the operation will close and will not receive
-	 * further elements.
-	 */
-	public Long getWaitTime() {
-		return waitTime;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
deleted file mode 100644
index 031c481..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents the application of a
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
- * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
- *
- * @param <IN> The type of the elements in the nput {@code StreamTransformation}
- * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
- */
-public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
-
-	private final StreamTransformation<IN> input;
-
-	private final OneInputStreamOperator<IN, OUT> operator;
-
-	private KeySelector<IN, ?> stateKeySelector;
-	
-	private TypeInformation<?> stateKeyType;
-
-	/**
-	 * Creates a new {@code OneInputTransformation} from the given input and operator.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
-	 * @param operator The {@code TwoInputStreamOperator}
-	 * @param outputType The type of the elements produced by this {@code OneInputTransformation}
-	 * @param parallelism The parallelism of this {@code OneInputTransformation}
-	 */
-	public OneInputTransformation(
-			StreamTransformation<IN> input,
-			String name,
-			OneInputStreamOperator<IN, OUT> operator,
-			TypeInformation<OUT> outputType,
-			int parallelism) {
-		super(name, outputType, parallelism);
-		this.input = input;
-		this.operator = operator;
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation} of this {@code OneInputTransformation}.
-	 */
-	public StreamTransformation<IN> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the {@code TypeInformation} for the elements of the input.
-	 */
-	public TypeInformation<IN> getInputType() {
-		return input.getOutputType();
-	}
-
-	/**
-	 * Returns the {@code TwoInputStreamOperator} of this Transformation.
-	 */
-	public OneInputStreamOperator<IN, OUT> getOperator() {
-		return operator;
-	}
-
-	/**
-	 * Sets the {@link KeySelector} that must be used for partitioning keyed state of this operation.
-	 *
-	 * @param stateKeySelector The {@code KeySelector} to set
-	 */
-	public void setStateKeySelector(KeySelector<IN, ?> stateKeySelector) {
-		this.stateKeySelector = stateKeySelector;
-	}
-
-	/**
-	 * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
-	 * Operation.
-	 *
-	 * @see #setStateKeySelector
-	 */
-	public KeySelector<IN, ?> getStateKeySelector() {
-		return stateKeySelector;
-	}
-
-	public void setStateKeyType(TypeInformation<?> stateKeyType) {
-		this.stateKeyType = stateKeyType;
-	}
-
-	public TypeInformation<?> getStateKeyType() {
-		return stateKeyType;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		operator.setChainingStrategy(strategy);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
deleted file mode 100644
index fa85349..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a change of partitioning of the input elements.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code PartitionTransformation}
- */
-public class PartitionTransformation<T> extends StreamTransformation<T> {
-	
-	private final StreamTransformation<T> input;
-	private final StreamPartitioner<T> partitioner;
-
-	/**
-	 * Creates a new {@code PartitionTransformation} from the given input and
-	 * {@link StreamPartitioner}.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param partitioner The {@code StreamPartitioner}
-	 */
-	public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<T> partitioner) {
-		super("Partition", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.partitioner = partitioner;
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
-	 */
-	public StreamTransformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the {@code StreamPartitioner} that must be used for partitioning the elements
-	 * of the input {@code StreamTransformation}.
-	 */
-	public StreamPartitioner<T> getPartitioner() {
-		return partitioner;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
deleted file mode 100644
index a66b65a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a selection of only certain upstream elements. This must
- * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
- * splits elements into several logical streams with assigned names.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SelectTransformation}
- */
-public class SelectTransformation<T> extends StreamTransformation<T> {
-	
-	private final StreamTransformation<T> input;
-	private final List<String> selectedNames;
-
-	/**
-	 * Creates a new {@code SelectionTransformation} from the given input that only selects
-	 * the streams with the selected names.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param selectedNames The names from the upstream {@code SplitTransformation} that this
-	 *                      {@code SelectTransformation} selects.
-	 */
-	public SelectTransformation(StreamTransformation<T> input,
-			List<String> selectedNames) {
-		super("Select", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.selectedNames = selectedNames;
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation}.
-	 */
-	public StreamTransformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the names of the split streams that this {@code SelectTransformation} selects.
-	 */
-	public List<String> getSelectedNames() {
-		return selectedNames;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
deleted file mode 100644
index 84ad6db..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSink;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents a Sink.
- *
- * @param <T> The type of the elements in the input {@code SinkTransformation}
- */
-public class SinkTransformation<T> extends StreamTransformation<Object> {
-
-	private final StreamTransformation<T> input;
-
-	private final StreamSink<T> operator;
-
-	// We need this because sinks can also have state that is partitioned by key
-	private KeySelector<T, ?> stateKeySelector;
-	
-	private TypeInformation<?> stateKeyType;
-
-	/**
-	 * Creates a new {@code SinkTransformation} from the given input {@code StreamTransformation}.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
-	 * @param operator The sink operator
-	 * @param parallelism The parallelism of this {@code SinkTransformation}
-	 */
-	public SinkTransformation(
-			StreamTransformation<T> input,
-			String name,
-			StreamSink<T> operator,
-			int parallelism) {
-		super(name, TypeExtractor.getForClass(Object.class), parallelism);
-		this.input = input;
-		this.operator = operator;
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
-	 */
-	public StreamTransformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the {@link StreamSink} that is the operator of this {@code SinkTransformation}.
-	 */
-	public StreamSink<T> getOperator() {
-		return operator;
-	}
-
-	/**
-	 * Sets the {@link KeySelector} that must be used for partitioning keyed state of this Sink.
-	 *
-	 * @param stateKeySelector The {@code KeySelector} to set
-	 */
-	public void setStateKeySelector(KeySelector<T, ?> stateKeySelector) {
-		this.stateKeySelector = stateKeySelector;
-	}
-
-	/**
-	 * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
-	 * Sink.
-	 *
-	 * @see #setStateKeySelector
-	 */
-	public KeySelector<T, ?> getStateKeySelector() {
-		return stateKeySelector;
-	}
-
-	public void setStateKeyType(TypeInformation<?> stateKeyType) {
-		this.stateKeyType = stateKeyType;
-	}
-
-	public TypeInformation<?> getStateKeyType() {
-		return stateKeyType;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		operator.setChainingStrategy(strategy);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
deleted file mode 100644
index 9835606..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSource;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * This represents a Source. This does not actually transform anything since it has no inputs but
- * it is the root {@code StreamTransformation} of any topology.
- *
- * @param <T> The type of the elements that this source produces
- */
-public class SourceTransformation<T> extends StreamTransformation<T> {
-
-	private final StreamSource<T> operator;
-
-	/**
-	 * Creates a new {@code SourceTransformation} from the given operator.
-	 *
-	 * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
-	 * @param operator The {@code StreamSource} that is the operator of this Transformation
-	 * @param outputType The type of the elements produced by this {@code SourceTransformation}
-	 * @param parallelism The parallelism of this {@code SourceTransformation}
-	 */
-	public SourceTransformation(
-			String name,
-			StreamSource<T> operator,
-			TypeInformation<T> outputType,
-			int parallelism) {
-		super(name, outputType, parallelism);
-		this.operator = operator;
-	}
-
-	/**
-	 * Returns the {@code StreamSource}, the operator of this {@code SourceTransformation}.
-	 */
-	public StreamSource<T> getOperator() {
-		return operator;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		return Collections.<StreamTransformation<?>>singleton(this);
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		operator.setChainingStrategy(strategy);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
deleted file mode 100644
index 96c1c9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a split of one
- * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
- * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SplitTransformation}
- */
-public class SplitTransformation<T> extends StreamTransformation<T> {
-
-	private final StreamTransformation<T> input;
-
-	private final OutputSelector<T> outputSelector;
-
-	/**
-	 * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
-	 *
-	 * @param input The input {@code StreamTransformation}
-	 * @param outputSelector The output selector
-	 */
-	public SplitTransformation(StreamTransformation<T> input,
-			OutputSelector<T> outputSelector) {
-		super("Split", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.outputSelector = outputSelector;
-	}
-
-	/**
-	 * Returns the input {@code StreamTransformation}.
-	 */
-	public StreamTransformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the {@code OutputSelector}
-	 */
-	public OutputSelector<T> getOutputSelector() {
-		return outputSelector;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
deleted file mode 100644
index 4e6dc42..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-
-/**
- * A {@code StreamTransformation} represents the operation that creates a
- * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
- * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
- * {@code StreamTransformation} that is the origin of said DataStream.
- *
- * <p>
- * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
- * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
- * graph is translated to a {@link StreamGraph} using
- * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
- *
- * <p>
- * A {@code StreamTransformation} does not necessarily correspond to a physical operation
- * at runtime. Some operations are only logical concepts. Examples of this are union,
- * split/select data stream, partitioning.
- *
- * <p>
- * The following graph of {@code StreamTransformations}:
- *
- * <pre>
- *   Source              Source        
- *      +                   +           
- *      |                   |           
- *      v                   v           
- *  Rebalance          HashPartition    
- *      +                   +           
- *      |                   |           
- *      |                   |           
- *      +------>Union<------+           
- *                +                     
- *                |                     
- *                v                     
- *              Split                   
- *                +                     
- *                |                     
- *                v                     
- *              Select                  
- *                +                     
- *                v                     
- *               Map                    
- *                +                     
- *                |                     
- *                v                     
- *              Sink 
- * </pre>
- *
- * Would result in this graph of operations at runtime:
- *
- * <pre>
- *  Source              Source
- *    +                   +
- *    |                   |
- *    |                   |
- *    +------->Map<-------+
- *              +
- *              |
- *              v
- *             Sink
- * </pre>
- *
- * The information about partitioning, union, split/select end up being encoded in the edges
- * that connect the sources to the map operation.
- *
- * @param <T> The type of the elements that result from this {@code StreamTransformation}
- */
-public abstract class StreamTransformation<T> {
-
-	// This is used to assign a unique ID to every StreamTransformation
-	protected static Integer idCounter = 0;
-	public static int getNewNodeId() {
-		idCounter++;
-		return idCounter;
-	}
-
-	protected final int id;
-
-	protected String name;
-
-	protected TypeInformation<T> outputType;
-	// This is used to handle MissingTypeInfo. As long as the outputType has not been queried
-	// it can still be changed using setOutputType(). Afterwards an exception is thrown when
-	// trying to change the output type.
-	protected boolean typeUsed;
-
-	private int parallelism;
-
-	protected long bufferTimeout = -1;
-
-	protected StreamGraph.ResourceStrategy resourceStrategy = StreamGraph.ResourceStrategy.DEFAULT;
-
-	/**
-	 * Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
-	 *
-	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
-	 * @param outputType The output type of this {@code StreamTransformation}
-	 * @param parallelism The parallelism of this {@code StreamTransformation}
-	 */
-	public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
-		this.id = getNewNodeId();
-		this.name = Preconditions.checkNotNull(name);
-		this.outputType = outputType;
-		this.parallelism = parallelism;
-	}
-
-	/**
-	 * Returns the unique ID of this {@code StreamTransformation}.
-	 */
-	public int getId() {
-		return id;
-	}
-
-	/**
-	 * Changes the name of this {@code StreamTransformation}.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	/**
-	 * Returns the name of this {@code StreamTransformation}.
-	 */
-	public String getName() {
-		return name;
-	}
-
-	/**
-	 * Returns the parallelism of this {@code StreamTransformation}
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	/**
-	 * Sets the parallelism of this {@code StreamTransformation}
-	 * @param parallelism The new parallelism to set on this {@code StreamTransformation}
-	 */
-	public void setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
-		this.parallelism = parallelism;
-	}
-
-	/**
-	 * Tries to fill in the type information. Type information can be filled in
-	 * later when the program uses a type hint. This method checks whether the
-	 * type information has ever been accessed before and does not allow
-	 * modifications if the type was accessed already. This ensures consistency
-	 * by making sure different parts of the operation do not assume different
-	 * type information.
-	 *
-	 * @param outputType The type information to fill in.
-	 *
-	 * @throws IllegalStateException Thrown, if the type information has been accessed before.
-	 */
-	public void setOutputType(TypeInformation<T> outputType) {
-		if (typeUsed) {
-			throw new IllegalStateException(
-					"TypeInformation cannot be filled in for the type after it has been used. "
-							+ "Please make sure that the type info hints are the first call after"
-							+ " the transformation function, "
-							+ "before any access to types or semantic properties, etc.");
-		}
-		this.outputType = outputType;
-	}
-
-	/**
-	 * Returns the output type of this {@code StreamTransformation} as a {@link TypeInformation}. Once
-	 * this is used once the output type cannot be changed anymore using {@link #setOutputType}.
-	 *
-	 * @return The output type of this {@code StreamTransformation}
-	 */
-	public TypeInformation<T> getOutputType() {
-		if (outputType instanceof MissingTypeInfo) {
-			MissingTypeInfo typeInfo = (MissingTypeInfo) this.outputType;
-			throw new InvalidTypesException(
-					"The return type of function '"
-							+ typeInfo.getFunctionName()
-							+ "' could not be determined automatically, due to type erasure. "
-							+ "You can give type information hints by using the returns(...) "
-							+ "method on the result of the transformation call, or by letting "
-							+ "your function implement the 'ResultTypeQueryable' "
-							+ "interface.", typeInfo.getTypeException());
-		}
-		typeUsed = true;
-		return this.outputType;
-	}
-
-	/**
-	 * Sets the chaining strategy of this {@code StreamTransformation}.
-	 */
-	public abstract void setChainingStrategy(ChainingStrategy strategy);
-
-	/**
-	 * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
-	 * sending elements over the network. The timeout specifies how long a network buffer
-	 * should be kept waiting before sending. A higher timeout means that more elements will
-	 * be sent in one buffer, this increases throughput. The latency, however, is negatively
-	 * affected by a higher timeout.
-	 */
-	public void setBufferTimeout(long bufferTimeout) {
-		this.bufferTimeout = bufferTimeout;
-	}
-
-	/**
-	 * Returns the buffer timeout of this {@code StreamTransformation}.
-	 *
-	 * <p>
-	 * {@see #setBufferTimeout}
-	 */
-	public long getBufferTimeout() {
-		return bufferTimeout;
-	}
-
-	/**
-	 * Sets the {@link org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy} of this
-	 * {@code StreamTransformation}. The resource strategy is used when scheduling operations on actual
-	 * workers when transforming the StreamTopology to an
-	 * {@link org.apache.flink.runtime.executiongraph.ExecutionGraph}.
-	 */
-	public void setResourceStrategy(StreamGraph.ResourceStrategy resourceStrategy) {
-		this.resourceStrategy = resourceStrategy;
-	}
-
-	/**
-	 * Returns the {@code ResourceStrategy} of this {@code StreamTransformation}.
-	 *
-	 * <p>
-	 * {@see #setResourceStrategy}
-	 */
-	public StreamGraph.ResourceStrategy getResourceStrategy() {
-		return resourceStrategy;
-	}
-
-	/**
-	 * Returns all transitive predecessor {@code StreamTransformation}s of this {@code StreamTransformation}. This
-	 * is, for example, used when determining whether a feedback edge of an iteration
-	 * actually has the iteration head as a predecessor.
-	 *
-	 * @return The list of transitive predecessors.
-	 */
-	public abstract Collection<StreamTransformation<?>> getTransitivePredecessors();
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + "{" +
-				"id=" + id +
-				", name='" + name + '\'' +
-				", outputType=" + outputType +
-				", parallelism=" + parallelism +
-				'}';
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (!(o instanceof StreamTransformation)) {
-			return false;
-		}
-
-		StreamTransformation<?> that = (StreamTransformation<?>) o;
-
-		if (bufferTimeout != that.bufferTimeout) {
-			return false;
-		}
-		if (id != that.id) {
-			return false;
-		}
-		if (parallelism != that.parallelism) {
-			return false;
-		}
-		if (!name.equals(that.name)) {
-			return false;
-		}
-		if (outputType != null ? !outputType.equals(that.outputType) : that.outputType != null) {
-			return false;
-		}
-		return resourceStrategy == that.resourceStrategy;
-	}
-
-	@Override
-	public int hashCode() {
-		int result = id;
-		result = 31 * result + name.hashCode();
-		result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
-		result = 31 * result + parallelism;
-		result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
-		result = 31 * result + resourceStrategy.hashCode();
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
deleted file mode 100644
index 30f0733..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This Transformation represents the application of a
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to two input
- * {@code StreamTransformations}. The result is again only one stream.
- *
- * @param <IN1> The type of the elements in the first input {@code StreamTransformation}
- * @param <IN2> The type of the elements in the second input {@code StreamTransformation}
- * @param <OUT> The type of the elements that result from this {@code TwoInputTransformation}
- */
-public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {
-
-	private final StreamTransformation<IN1> input1;
-	private final StreamTransformation<IN2> input2;
-
-	private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
-	/**
-	 * Creates a new {@code TwoInputTransformation} from the given inputs and operator.
-	 *
-	 * @param input1 The first input {@code StreamTransformation}
-	 * @param input2 The second input {@code StreamTransformation}
-	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
-	 * @param operator The {@code TwoInputStreamOperator}
-	 * @param outputType The type of the elements produced by this Transformation
-	 * @param parallelism The parallelism of this Transformation
-	 */
-	public TwoInputTransformation(
-			StreamTransformation<IN1> input1,
-			StreamTransformation<IN2> input2,
-			String name,
-			TwoInputStreamOperator<IN1, IN2, OUT> operator,
-			TypeInformation<OUT> outputType,
-			int parallelism) {
-		super(name, outputType, parallelism);
-		this.input1 = input1;
-		this.input2 = input2;
-		this.operator = operator;
-	}
-
-	/**
-	 * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
-	 */
-	public StreamTransformation<IN1> getInput1() {
-		return input1;
-	}
-
-	/**
-	 * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
-	 */
-	public StreamTransformation<IN2> getInput2() {
-		return input2;
-	}
-
-	/**
-	 * Returns the {@code TypeInformation} for the elements from the first input.
-	 */
-	public TypeInformation<IN1> getInputType1() {
-		return input1.getOutputType();
-	}
-
-	/**
-	 * Returns the {@code TypeInformation} for the elements from the first input.
-	 */
-	public TypeInformation<IN2> getInputType2() {
-		return input2.getOutputType();
-	}
-
-	/**
-	 * Returns the {@code TwoInputStreamOperator} of this Transformation.
-	 */
-	public TwoInputStreamOperator<IN1, IN2, OUT> getOperator() {
-		return operator;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input1.getTransitivePredecessors());
-		result.addAll(input2.getTransitivePredecessors());
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		operator.setChainingStrategy(strategy);
-	}
-
-}