You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:52 UTC

[36/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
deleted file mode 100644
index 45cfff1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGenerator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
-
-	private StreamGraph streamGraph;
-
-	private Map<Integer, JobVertex> jobVertices;
-	private JobGraph jobGraph;
-	private Collection<Integer> builtVertices;
-
-	private List<StreamEdge> physicalEdgesInOrder;
-
-	private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
-	private Map<Integer, StreamConfig> vertexConfigs;
-	private Map<Integer, String> chainedNames;
-
-	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
-		this.streamGraph = streamGraph;
-	}
-
-	private void init() {
-		this.jobVertices = new HashMap<Integer, JobVertex>();
-		this.builtVertices = new HashSet<Integer>();
-		this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
-		this.vertexConfigs = new HashMap<Integer, StreamConfig>();
-		this.chainedNames = new HashMap<Integer, String>();
-		this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
-	}
-
-	public JobGraph createJobGraph(String jobName) {
-		jobGraph = new JobGraph(jobName);
-
-		// make sure that all vertices start immediately
-		jobGraph.setScheduleMode(ScheduleMode.ALL);
-
-		init();
-
-		setChaining();
-
-		setPhysicalEdges();
-
-		setSlotSharing();
-		
-		configureCheckpointing();
-
-		configureExecutionRetries();
-		
-		configureExecutionRetryDelay();
-
-		try {
-			InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
-		} catch (IOException e) {
-			throw new RuntimeException("Config object could not be written to Job Configuration: ", e);
-		}
-		
-		return jobGraph;
-	}
-
-	private void setPhysicalEdges() {
-		Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
-
-		for (StreamEdge edge : physicalEdgesInOrder) {
-			int target = edge.getTargetId();
-
-			List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
-
-			// create if not set
-			if (inEdges == null) {
-				inEdges = new ArrayList<StreamEdge>();
-				physicalInEdgesInOrder.put(target, inEdges);
-			}
-
-			inEdges.add(edge);
-		}
-
-		for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
-			int vertex = inEdges.getKey();
-			List<StreamEdge> edgeList = inEdges.getValue();
-
-			vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
-		}
-	}
-
-	private void setChaining() {
-		for (Integer sourceName : streamGraph.getSourceIDs()) {
-			createChain(sourceName, sourceName);
-		}
-	}
-
-	private List<StreamEdge> createChain(Integer startNode, Integer current) {
-
-		if (!builtVertices.contains(startNode)) {
-
-			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
-
-			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
-			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
-
-			for (StreamEdge outEdge : streamGraph.getStreamNode(current).getOutEdges()) {
-				if (isChainable(outEdge)) {
-					chainableOutputs.add(outEdge);
-				} else {
-					nonChainableOutputs.add(outEdge);
-				}
-			}
-
-			for (StreamEdge chainable : chainableOutputs) {
-				transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetId()));
-			}
-
-			for (StreamEdge nonChainable : nonChainableOutputs) {
-				transitiveOutEdges.add(nonChainable);
-				createChain(nonChainable.getTargetId(), nonChainable.getTargetId());
-			}
-
-			chainedNames.put(current, createChainedName(current, chainableOutputs));
-
-			StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
-					: new StreamConfig(new Configuration());
-
-			setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
-
-			if (current.equals(startNode)) {
-
-				config.setChainStart();
-				config.setOutEdgesInOrder(transitiveOutEdges);
-				config.setOutEdges(streamGraph.getStreamNode(current).getOutEdges());
-
-				for (StreamEdge edge : transitiveOutEdges) {
-					connect(startNode, edge);
-				}
-
-				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
-
-			} else {
-
-				Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
-
-				if (chainedConfs == null) {
-					chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>());
-				}
-				chainedConfigs.get(startNode).put(current, config);
-			}
-
-			return transitiveOutEdges;
-
-		} else {
-			return new ArrayList<StreamEdge>();
-		}
-	}
-
-	private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
-		String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName();
-		if (chainedOutputs.size() > 1) {
-			List<String> outputChainedNames = new ArrayList<String>();
-			for (StreamEdge chainable : chainedOutputs) {
-				outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
-			}
-			return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
-		} else if (chainedOutputs.size() == 1) {
-			return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
-		} else {
-			return operatorName;
-		}
-
-	}
-
-	private StreamConfig createProcessingVertex(Integer vertexID) {
-
-		JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
-		StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
-		jobVertex.setInvokableClass(vertex.getJobVertexClass());
-
-		int parallelism = vertex.getParallelism();
-
-		if (parallelism > 0) {
-			jobVertex.setParallelism(parallelism);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
-		}
-
-		if (vertex.getInputFormat() != null) {
-			jobVertex.setInputSplitSource(vertex.getInputFormat());
-		}
-
-		jobVertices.put(vertexID, jobVertex);
-		builtVertices.add(vertexID);
-		jobGraph.addVertex(jobVertex);
-
-		return new StreamConfig(jobVertex.getConfiguration());
-	}
-
-	@SuppressWarnings("unchecked")
-	private void setVertexConfig(Integer vertexID, StreamConfig config,
-			List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
-
-		StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
-		config.setVertexID(vertexID);
-		config.setBufferTimeout(vertex.getBufferTimeout());
-
-		config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
-		config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
-		config.setTypeSerializerOut(vertex.getTypeSerializerOut());
-
-		config.setStreamOperator(vertex.getOperator());
-		config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
-
-		config.setNumberOfOutputs(nonChainableOutputs.size());
-		config.setNonChainedOutputs(nonChainableOutputs);
-		config.setChainedOutputs(chainableOutputs);
-
-		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-		if (streamGraph.isCheckpointingEnabled()) {
-			config.setCheckpointMode(streamGraph.getCheckpointingMode());
-			config.setStateBackend(streamGraph.getStateBackend());
-		} else {
-			// the at least once input handler is slightly cheaper (in the absence of checkpoints),
-			// so we use that one if checkpointing is not enabled
-			config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
-		}
-		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
-		config.setStateKeySerializer(vertex.getStateKeySerializer());
-
-		
-		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
-
-		if (vertexClass.equals(StreamIterationHead.class)
-				|| vertexClass.equals(StreamIterationTail.class)) {
-			config.setIterationId(streamGraph.getBrokerID(vertexID));
-			config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
-		}
-
-		List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
-		allOutputs.addAll(nonChainableOutputs);
-
-		vertexConfigs.put(vertexID, config);
-	}
-
-	private void connect(Integer headOfChain, StreamEdge edge) {
-
-		physicalEdgesInOrder.add(edge);
-
-		Integer downStreamvertexID = edge.getTargetId();
-
-		JobVertex headVertex = jobVertices.get(headOfChain);
-		JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
-
-		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
-
-		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
-
-		StreamPartitioner<?> partitioner = edge.getPartitioner();
-		if (partitioner instanceof ForwardPartitioner) {
-			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
-		} else {
-			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
-					headOfChain, downStreamvertexID);
-		}
-	}
-
-	private boolean isChainable(StreamEdge edge) {
-		StreamNode upStreamVertex = edge.getSourceVertex();
-		StreamNode downStreamVertex = edge.getTargetVertex();
-
-		StreamOperator<?> headOperator = upStreamVertex.getOperator();
-		StreamOperator<?> outOperator = downStreamVertex.getOperator();
-
-		return downStreamVertex.getInEdges().size() == 1
-				&& outOperator != null
-				&& headOperator != null
-				&& upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
-				&& upStreamVertex.getSlotSharingID() != -1
-				&& (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
-					outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
-				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
-					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
-					headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
-				&& (edge.getPartitioner() instanceof ForwardPartitioner || downStreamVertex
-						.getParallelism() == 1)
-				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
-				&& (streamGraph.isChainingEnabled() ||
-					outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS);
-	}
-
-	private void setSlotSharing() {
-
-		Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>();
-
-		for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
-
-			int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
-
-			if (slotSharingID != -1) {
-				SlotSharingGroup group = slotSharingGroups.get(slotSharingID);
-				if (group == null) {
-					group = new SlotSharingGroup();
-					slotSharingGroups.put(slotSharingID, group);
-				}
-				entry.getValue().setSlotSharingGroup(group);
-			}
-		}
-
-		for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
-
-			CoLocationGroup ccg = new CoLocationGroup();
-
-			JobVertex source = jobVertices.get(pair.f0.getId());
-			JobVertex sink = jobVertices.get(pair.f1.getId());
-
-			ccg.addVertex(source);
-			ccg.addVertex(sink);
-			source.updateCoLocationGroup(ccg);
-			sink.updateCoLocationGroup(ccg);
-		}
-
-	}
-	
-	private void configureCheckpointing() {
-		if (streamGraph.isCheckpointingEnabled()) {
-			long interval = streamGraph.getCheckpointingInterval();
-			if (interval < 1) {
-				throw new IllegalArgumentException("The checkpoint interval must be positive");
-			}
-
-			// collect the vertices that receive "trigger checkpoint" messages.
-			// currently, these are all the sources
-			List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
-
-			// collect the vertices that need to acknowledge the checkpoint
-			// currently, these are all vertices
-			List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
-
-			// collect the vertices that receive "commit checkpoint" messages
-			// currently, these are all certices
-			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
-			
-			
-			for (JobVertex vertex : jobVertices.values()) {
-				if (vertex.isInputVertex()) {
-					triggerVertices.add(vertex.getID());
-				}
-				// TODO: add check whether the user function implements the checkpointing interface
-				commitVertices.add(vertex.getID());
-				ackVertices.add(vertex.getID());
-			}
-
-			JobSnapshottingSettings settings = new JobSnapshottingSettings(
-					triggerVertices, ackVertices, commitVertices, interval);
-			jobGraph.setSnapshotSettings(settings);
-
-			// if the user enabled checkpointing, the default number of exec retries is infinitive.
-			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
-			if(executionRetries == -1) {
-				streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
-			}
-			long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
-			if(executionRetryDelay == -1) {
-				streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
-			}
-		}
-	}
-
-	private void configureExecutionRetries() {
-		int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
-		if (executionRetries != -1) {
-			jobGraph.setNumberOfExecutionRetries(executionRetries);
-		} else {
-			// if the user didn't configure anything, the number of retries is 0.
-			jobGraph.setNumberOfExecutionRetries(0);
-		}
-	}
-	
-	private void configureExecutionRetryDelay() {
-		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
-		if (executionRetryDelay != -1) {
-			jobGraph.setExecutionRetryDelay(executionRetryDelay);
-		} else {
-			jobGraph.setExecutionRetryDelay(100 * 1000);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
deleted file mode 100644
index 078679d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for all stream operators. Operators that contain a user function should extend the class 
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). 
- * 
- * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
- * mark the operator as unary or binary:
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}.
- *
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
- *
- * @param <OUT> The output type of the operator
- */
-public abstract class AbstractStreamOperator<OUT> 
-		implements StreamOperator<OUT>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	/** The logger used by the operator class and its subclasses */
-	protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
-
-	// ----------- configuration properties -------------
-
-	// A sane default for most operators
-	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-	
-	private boolean inputCopyDisabled = false;
-	
-	// ---------------- runtime fields ------------------
-
-	/** The task that contains this operator (and other operators in the same chain) */
-	private transient StreamTask<?, ?> container;
-	
-	private transient StreamConfig config;
-
-	protected transient Output<StreamRecord<OUT>> output;
-
-	/** The runtime context for UDFs */
-	private transient StreamingRuntimeContext runtimeContext;
-
-	
-	// ---------------- key/value state ------------------
-	
-	/** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
-	private transient KeySelector<?, ?> stateKeySelector;
-	
-	private transient KvState<?, ?, ?>[] keyValueStates;
-	
-	private transient HashMap<String, KvState<?, ?, ?>> keyValueStatesByName;
-	
-	private transient TypeSerializer<?> keySerializer;
-	
-	private transient HashMap<String, KvStateSnapshot<?, ?, ?>> keyValueStateSnapshots;
-	
-	// ------------------------------------------------------------------------
-	//  Life Cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
-		this.container = containingTask;
-		this.config = config;
-		this.output = output;
-		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
-	}
-
-	/**
-	 * This method is called immediately before any elements are processed, it should contain the
-	 * operator's initialization logic.
-	 *
-	 * <p>The default implementation does nothing.
-	 * 
-	 * @throws Exception An exception in this method causes the operator to fail.
-	 */
-	@Override
-	public void open() throws Exception {}
-
-	/**
-	 * This method is called after all records have been added to the operators via the methods
-	 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
-	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
-	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
-
-	 * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing
-	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
-	 * because the last data items are not processed properly.
-	 *
-	 * @throws Exception An exception in this method causes the operator to fail.
-	 */
-	@Override
-	public void close() throws Exception {}
-	
-	/**
-	 * This method is called at the very end of the operator's life, both in the case of a successful
-	 * completion of the operation, and in the case of a failure and canceling.
-	 *
-	 * This method is expected to make a thorough effort to release all resources
-	 * that the operator has acquired.
-	 */
-	@Override
-	public void dispose() {
-		if (keyValueStates != null) {
-			for (KvState<?, ?, ?> state : keyValueStates) {
-				state.dispose();
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		// here, we deal with key/value state snapshots
-		
-		StreamTaskState state = new StreamTaskState();
-		if (keyValueStates != null) {
-			HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
-			
-			for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
-				KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().shapshot(checkpointId, timestamp);
-				snapshots.put(entry.getKey(), snapshot);
-			}
-			
-			state.setKvStates(snapshots);
-		}
-		
-		return state;
-	}
-	
-	@Override
-	public void restoreState(StreamTaskState state) throws Exception {
-		// restore the key/value state. the actual restore happens lazily, when the function requests
-		// the state again, because the restore method needs information provided by the user function
-		keyValueStateSnapshots = state.getKvStates();
-	}
-	
-	@Override
-	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
-		// by default, nothing needs a notification of checkpoint completion
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties and Services
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the execution config defined on the execution environment of the job to which this
-	 * operator belongs.
-	 * 
-	 * @return The job's execution config.
-	 */
-	public ExecutionConfig getExecutionConfig() {
-		return container.getExecutionConfig();
-	}
-	
-	public StreamConfig getOperatorConfig() {
-		return config;
-	}
-	
-	public StreamTask<?, ?> getContainingTask() {
-		return container;
-	}
-	
-	public ClassLoader getUserCodeClassloader() {
-		return container.getUserCodeClassLoader();
-	}
-	
-	/**
-	 * Returns a context that allows the operator to query information about the execution and also
-	 * to interact with systems such as broadcast variables and managed state. This also allows
-	 * to register timers.
-	 */
-	public StreamingRuntimeContext getRuntimeContext() {
-		return runtimeContext;
-	}
-
-	public StateBackend<?> getStateBackend() {
-		return container.getStateBackend();
-	}
-
-	/**
-	 * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
-	 * This call is guaranteed to not happen concurrently with method calls on the operator.
-	 *
-	 * @param time The absolute time in milliseconds.
-	 * @param target The target to be triggered.
-	 */
-	protected void registerTimer(long time, Triggerable target) {
-		container.registerTimer(time, target);
-	}
-
-	/**
-	 * Creates a key/value state handle, using the state backend configured for this task.
-	 *
-	 * @param stateType The type information for the state type, used for managed memory and state snapshots.
-	 * @param defaultValue The default value that the state should return for keys that currently have
-	 *                     no value associated with them 
-	 *
-	 * @param <V> The type of the state value.
-	 *
-	 * @return The key/value state for this operator.
-	 *
-	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
-	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
-	 */
-	protected <V> OperatorState<V> createKeyValueState(
-			String name, TypeInformation<V> stateType, V defaultValue) throws Exception
-	{
-		return createKeyValueState(name, stateType.createSerializer(getExecutionConfig()), defaultValue);
-	}
-	
-	/**
-	 * Creates a key/value state handle, using the state backend configured for this task.
-	 * 
-	 * @param valueSerializer The type serializer for the state type, used for managed memory and state snapshots.
-	 * @param defaultValue The default value that the state should return for keys that currently have
-	 *                     no value associated with them 
-	 * 
-	 * @param <K> The type of the state key.
-	 * @param <V> The type of the state value.
-	 * @param <Backend> The type of the state backend that creates the key/value state.
-	 * 
-	 * @return The key/value state for this operator.
-	 * 
-	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
-	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
-			String name, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
-	{
-		if (name == null || name.isEmpty()) {
-			throw new IllegalArgumentException();
-		}
-		if (keyValueStatesByName != null && keyValueStatesByName.containsKey(name)) {
-			throw new IllegalStateException("The key/value state has already been created");
-		}
-
-		TypeSerializer<K> keySerializer;
-		
-		// first time state access, make sure we load the state partitioner
-		if (stateKeySelector == null) {
-			stateKeySelector = config.getStatePartitioner(getUserCodeClassloader());
-			if (stateKeySelector == null) {
-				throw new UnsupportedOperationException("The function or operator is not executed " +
-						"on a KeyedStream and can hence not access the key/value state");
-			}
-
-			keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
-			if (keySerializer == null) {
-				throw new Exception("State key serializer has not been configured in the config.");
-			}
-			this.keySerializer = keySerializer;
-		}
-		else if (this.keySerializer != null) {
-			keySerializer = (TypeSerializer<K>) this.keySerializer;
-		}
-		else {
-			// should never happen, this is merely a safeguard
-			throw new RuntimeException();
-		}
-		
-		@SuppressWarnings("unchecked")
-		Backend stateBackend = (Backend) container.getStateBackend();
-
-		KvState<K, V, Backend> kvstate = null;
-		
-		// check whether we restore the key/value state from a snapshot, or create a new blank one
-		if (keyValueStateSnapshots != null) {
-			@SuppressWarnings("unchecked")
-			KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshots.remove(name);
-
-			if (snapshot != null) {
-				kvstate = snapshot.restoreState(
-						stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
-			}
-		}
-		
-		if (kvstate == null) {
-			// create a new blank key/value state
-			kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
-		}
-
-		if (keyValueStatesByName == null) {
-			keyValueStatesByName = new HashMap<>();
-		}
-		keyValueStatesByName.put(name, kvstate);
-		keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
-		return kvstate;
-	}
-	
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setKeyContextElement(StreamRecord record) throws Exception {
-		if (stateKeySelector != null && keyValueStates != null) {
-			KeySelector selector = stateKeySelector;
-			for (KvState kv : keyValueStates) {
-				kv.setCurrentKey(selector.getKey(record.getValue()));
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Context and chaining properties
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		this.chainingStrategy = strategy;
-	}
-	
-	@Override
-	public final ChainingStrategy getChainingStrategy() {
-		return chainingStrategy;
-	}
-	
-	@Override
-	public boolean isInputCopyingDisabled() {
-		return inputCopyDisabled;
-	}
-
-	/**
-	 * Enable object-reuse for this operator instance. This overrides the setting in
-	 * the {@link org.apache.flink.api.common.ExecutionConfig}
-	 */
-	public void disableInputCopy() {
-		this.inputCopyDisabled = true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
deleted file mode 100644
index 17bd08d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * This is used as the base class for operators that have a user-defined
- * function. This class handles the opening and closing of the user-defined functions,
- * as part of the operator life cycle.
- * 
- * @param <OUT>
- *            The output type of the operator
- * @param <F>
- *            The type of the user function
- */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	
-	
-	/** the user function */
-	protected final F userFunction;
-	
-	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
-	private transient boolean functionsClosed = false;
-	
-	
-	public AbstractUdfStreamOperator(F userFunction) {
-		this.userFunction = requireNonNull(userFunction);
-	}
-
-	/**
-	 * Gets the user function executed in this operator.
-	 * @return The user function of this operator.
-	 */
-	public F getUserFunction() {
-		return userFunction;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  operator life cycle
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
-		super.setup(containingTask, config, output);
-		
-		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		
-		FunctionUtils.openFunction(userFunction, new Configuration());
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		functionsClosed = true;
-		FunctionUtils.closeFunction(userFunction);
-	}
-
-	@Override
-	public void dispose() {
-		if (!functionsClosed) {
-			functionsClosed = true;
-			try {
-				FunctionUtils.closeFunction(userFunction);
-			}
-			catch (Throwable t) {
-				LOG.error("Exception while closing user function while failing or canceling task", t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  checkpointing and recovery
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);
-
-		if (userFunction instanceof Checkpointed) {
-			@SuppressWarnings("unchecked")
-			Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
-			
-			Serializable udfState;
-			try {
-				udfState = chkFunction.snapshotState(checkpointId, timestamp);
-			} 
-			catch (Exception e) {
-				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
-			}
-			
-			if (udfState != null) {
-				try {
-					StateBackend<?> stateBackend = getStateBackend();
-					StateHandle<Serializable> handle = 
-							stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
-					state.setFunctionState(handle);
-				}
-				catch (Exception e) {
-					throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
-							+ e.getMessage(), e);
-				}
-			}
-		}
-		
-		return state;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState state) throws Exception {
-		super.restoreState(state);
-		
-		StateHandle<Serializable> stateHandle =  state.getFunctionState();
-		
-		if (userFunction instanceof Checkpointed && stateHandle != null) {
-			@SuppressWarnings("unchecked")
-			Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;
-			
-			Serializable functionState = stateHandle.getState(getUserCodeClassloader());
-			if (functionState != null) {
-				try {
-					chkFunction.restoreState(functionState);
-				}
-				catch (Exception e) {
-					throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
-				}
-			}
-		}
-	}
-
-	@Override
-	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
-		super.notifyOfCompletedCheckpoint(checkpointId);
-
-		if (userFunction instanceof CheckpointNotifier) {
-			((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * 
-	 * Since the streaming API does not implement any parametrization of functions via a
-	 * configuration, the config returned here is actually empty.
-	 * 
-	 * @return The user function parameters (currently empty)
-	 */
-	public Configuration getUserFunctionParameters() {
-		return new Configuration();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
deleted file mode 100644
index 3a752b0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-/**
- * Defines the chaining scheme for the operator.
- * By default {@link #ALWAYS} is used, which means operators will be eagerly chained whenever possible.
- */
-public enum ChainingStrategy {
-
-	/**
-	 * Chaining will happen even if chaining is disabled on the execution environment.
-	 * This should only be used by system-level operators, not operators implemented by users.
-	 */
-	FORCE_ALWAYS,
-
-	/** 
-	 * Operators will be eagerly chained whenever possible, for
-	 * maximal performance. It is generally a good practice to allow maximal
-	 * chaining and increase operator parallelism
-	 */
-	ALWAYS,
-
-	/**
-	 * The operator will not be chained to the preceding or succeeding operators.
-	 */
-	NEVER,
-	
-	
-	HEAD
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
deleted file mode 100644
index 705c1b3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Interface for stream operators with one input. Use
- * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
- * you want to implement a custom operator.
- * 
- * @param <IN> The input type of the operator
- * @param <OUT> The output type of the operator
- */
-public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
-
-	/**
-	 * Processes one element that arrived at this operator.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 */
-	void processElement(StreamRecord<IN> element) throws Exception;
-
-	/**
-	 * Processes a {@link Watermark}.
-	 * This method is guaranteed to not be called concurrently with other methods of the operator.
-	 *
-	 * @see org.apache.flink.streaming.api.watermark.Watermark
-	 */
-	void processWatermark(Watermark mark) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
deleted file mode 100644
index 0cbc954..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.util.Collector;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
- * of this interface that can be used to emit elements and other messages, such as barriers
- * and watermarks, from an operator.
- *
- * @param <T> The type of the elements that can be emitted.
- */
-public interface Output<T> extends Collector<T> {
-
-	/**
-	 * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
-	 * operators.
-	 *
-	 * <p>A watermark specifies that no element with a timestamp older or equal to the watermark
-	 * timestamp will be emitted in the future.
-	 */
-	void emitWatermark(Watermark mark);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
deleted file mode 100644
index 1d05966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * Stream operators can implement this interface if they need access to the output type information
- * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for
- * cases where the output type is specified by the returns method and, thus, after the stream
- * operator has been created.
- */
-public interface OutputTypeConfigurable<OUT> {
-
-	/**
-	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
-	 * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
-	 * method is called with the output {@link TypeInformation} which is also used for the
-	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
-	 *
-	 * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask}
-	 * @param executionConfig Execution configuration
-	 */
-	void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
deleted file mode 100644
index efe5d52..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	private Long count = 0L;
-
-	public StreamCounter() {
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) {
-		output.collect(element.replace(++count));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
deleted file mode 100644
index 2ff220e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamFilter(FilterFunction<IN> filterFunction) {
-		super(filterFunction);
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		if (userFunction.filter(element.getValue())) {
-			output.collect(element);
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
deleted file mode 100644
index 23b638e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamFlatMap<IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
-		super(flatMapper);
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<OUT>(output);
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		collector.setTimestamp(element.getTimestamp());
-		userFunction.flatMap(element.getValue(), collector);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
deleted file mode 100644
index cf6b489..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamGroupedFold<IN, OUT, KEY>
-		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private static final String STATE_NAME = "_op_state";
-
-	// Grouped values
-	private transient OperatorState<OUT> values;
-	
-	private transient OUT initialValue;
-	
-	// Initial value serialization
-	private byte[] serializedInitialValue;
-	
-	private TypeSerializer<OUT> outTypeSerializer;
-	
-	public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
-		super(folder);
-		this.initialValue = initialValue;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-
-		if (serializedInitialValue == null) {
-			throw new RuntimeException("No initial value was serialized for the fold " +
-					"operator. Probably the setOutputType method was not called.");
-		}
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
-				new DataInputStream(bais)
-		);
-		initialValue = outTypeSerializer.deserialize(in);
-		values = createKeyValueState(STATE_NAME, outTypeSerializer, null);
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		OUT value = values.value();
-
-		if (value != null) {
-			OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
-			values.update(folded);
-			output.collect(element.replace(folded));
-		} else {
-			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
-			values.update(first);
-			output.collect(element.replace(first));
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
-				new DataOutputStream(baos)
-		);
-
-		try {
-			outTypeSerializer.serialize(initialValue, out);
-		} catch (IOException ioe) {
-			throw new RuntimeException("Unable to serialize initial value of type " +
-					initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
-		}
-
-		serializedInitialValue = baos.toByteArray();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
deleted file mode 100644
index ae15e92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
-		implements OneInputStreamOperator<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String STATE_NAME = "_op_state";
-	
-	private transient OperatorState<IN> values;
-	
-	private TypeSerializer<IN> serializer;
-
-	
-	public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
-		super(reducer);
-		this.serializer = serializer;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		values = createKeyValueState(STATE_NAME, serializer, null);
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		IN value = element.getValue();
-		IN currentValue = values.value();
-		
-		if (currentValue != null) {
-			IN reduced = userFunction.reduce(currentValue, value);
-			values.update(reduced);
-			output.collect(element.replace(reduced));
-		} else {
-			values.update(value);
-			output.collect(element.replace(value));
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
deleted file mode 100644
index 7d5c7cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamMap<IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamMap(MapFunction<IN, OUT> mapper) {
-		super(mapper);
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		output.collect(element.replace(userFunction.map(element.getValue())));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
deleted file mode 100644
index fac26f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-/**
- * Basic interface for stream operators. Implementers would implement one of
- * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
- * that process elements.
- * 
- * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
- * offers default implementation for the lifecycle and properties methods.
- *
- * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
- * 
- * @param <OUT> The output type of the operator
- */
-public interface StreamOperator<OUT> extends Serializable {
-	
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Initializes the operator. Sets access to the context and the output.
-	 */
-	void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
-
-	/**
-	 * This method is called immediately before any elements are processed, it should contain the
-	 * operator's initialization logic.
-	 * 
-	 * @throws java.lang.Exception An exception in this method causes the operator to fail.
-	 */
-	void open() throws Exception;
-
-	/**
-	 * This method is called after all records have been added to the operators via the methods
-	 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
-	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
-	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
-
-	 * <p>
-	 * The method is expected to flush all remaining buffered data. Exceptions during this flushing
-	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
-	 * because the last data items are not processed properly.
-	 * 
-	 * @throws java.lang.Exception An exception in this method causes the operator to fail.
-	 */
-	void close() throws Exception;
-
-	/**
-	 * This method is called at the very end of the operator's life, both in the case of a successful
-	 * completion of the operation, and in the case of a failure and canceling.
-	 * 
-	 * This method is expected to make a thorough effort to release all resources
-	 * that the operator has acquired.
-	 */
-	void dispose();
-
-	// ------------------------------------------------------------------------
-	//  state snapshots
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Called to draw a state snapshot from the operator. This method snapshots the operator state
-	 * (if the operator is stateful) and the key/value state (if it is being used and has been
-	 * initialized).
-	 *
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 *
-	 * @return The StreamTaskState object, possibly containing the snapshots for the
-	 *         operator and key/value state.
-	 *
-	 * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator
-	 *                   and the key/value state.
-	 */
-	StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
-	
-	/**
-	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
-	 * This method restores the operator state (if the operator is stateful) and the key/value state
-	 * (if it had been used and was initialized when the snapshot ocurred).
-	 *
-	 * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
-	 * and before {@link #open()}.
-	 *
-	 * @param state The state of operator that was snapshotted as part of checkpoint
-	 *              from which the execution is restored.
-	 *
-	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
-	 *                   properly react to failed state restore and fail the execution attempt.
-	 */
-	void restoreState(StreamTaskState state) throws Exception;
-
-	/**
-	 * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
-	 *
-	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 *
-	 * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause
-	 *                   the program to fail and enter recovery.
-	 */
-	void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
-
-	// ------------------------------------------------------------------------
-	//  miscellaneous
-	// ------------------------------------------------------------------------
-	
-	void setKeyContextElement(StreamRecord<?> record) throws Exception;
-	
-	/**
-	 * An operator can return true here to disable copying of its input elements. This overrides
-	 * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
-	 */
-	boolean isInputCopyingDisabled();
-	
-	ChainingStrategy getChainingStrategy();
-
-	void setChainingStrategy(ChainingStrategy strategy);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
deleted file mode 100644
index 1ce4ff6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamProject<IN, OUT extends Tuple>
-		extends AbstractStreamOperator<OUT>
-		implements OneInputStreamOperator<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private TypeSerializer<OUT> outSerializer;
-	private int[] fields;
-	private int numFields;
-
-	private transient OUT outTuple;
-
-	public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
-		this.fields = fields;
-		this.numFields = this.fields.length;
-		this.outSerializer = outSerializer;
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		for (int i = 0; i < this.numFields; i++) {
-			outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
-		}
-		output.collect(element.replace(outTuple));
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		outTuple = outSerializer.createInstance();
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
deleted file mode 100644
index 6961a4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
-		implements OneInputStreamOperator<IN, Object> {
-
-	private static final long serialVersionUID = 1L;
-
-	public StreamSink(SinkFunction<IN> sinkFunction) {
-		super(sinkFunction);
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		userFunction.invoke(element.getValue());
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// ignore it for now, we are a sink, after all
-	}
-}