You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:53 UTC

[12/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
deleted file mode 100644
index 8ffca91..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Class that encapsulates the functionality necessary to merge windows created
- * in parallel. This CoFlatMap uses the information received on the number of
- * parts for each window to merge the different parts. It waits until it
- * receives an indication on the number of parts from all the discretizers
- * before producing any output.
- */
-public class ParallelMerge<OUT> extends
-		RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Integer numberOfDiscretizers;
-	private ReduceFunction<OUT> reducer;
-
-	private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
-	private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
-	private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
-
-	public ParallelMerge(ReduceFunction<OUT> reducer) {
-		this.reducer = reducer;
-	}
-
-	@Override
-	public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
-			throws Exception {
-
-		Integer id = nextWindow.windowID;
-
-		Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
-		if (current == null) {
-			current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
-		} else {
-			updateCurrent(current.f0, nextWindow);
-			current.f1++;
-		}
-
-		Integer count = current.f1;
-
-		if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
-			out.collect(current.f0);
-			receivedWindows.remove(id);
-			availableNumberOfParts.remove(id);
-
-			checkOld(id);
-
-		} else {
-			receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
-		}
-	}
-
-	private void checkOld(Integer id) {
-		// In case we have remaining partial windows (which indicates errors in
-		// processing), output and log them
-		if (receivedWindows.containsKey(id - 1)) {
-			throw new RuntimeException("Error in processing logic, window with id " + id
-					+ " should have already been processed");
-		}
-
-	}
-
-	@Override
-	public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
-			throws Exception {
-
-		Integer id = partInfo.f0;
-		Integer numOfParts = partInfo.f1;
-
-		Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
-		if (currentPartInfo != null) {
-			currentPartInfo.f0 += numOfParts;
-			currentPartInfo.f1++;
-		} else {
-			currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
-			receivedNumberOfParts.put(id, currentPartInfo);
-		}
-
-		if (currentPartInfo.f1 >= numberOfDiscretizers) {
-			receivedNumberOfParts.remove(id);
-
-			Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
-			Integer count = current != null ? current.f1 : -1;
-
-			if (count >= currentPartInfo.f0) {
-				out.collect(current.f0);
-				receivedWindows.remove(id);
-				checkOld(id);
-			} else if (currentPartInfo.f0 > 0) {
-				availableNumberOfParts.put(id, currentPartInfo.f1);
-			}
-		}
-
-	}
-
-	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
-			throws Exception {
-		if (current.size() != 1 || nextWindow.size() != 1) {
-			throw new RuntimeException(
-					"Error in parallel merge logic. Current window should contain only one element.");
-		}
-		OUT currentReduced = current.remove(0);
-		currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
-		current.add(currentReduced);
-	}
-
-	@Override
-	public void open(Configuration conf) {
-		this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
deleted file mode 100644
index d4776e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This invokable represents the discretization step of a window transformation.
- * The user supplied eviction and trigger policies are applied to create the
- * {@link StreamWindow} that will be further transformed in the next stages.
- */
-public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>> {
-
-	/**
-	 * Auto-generated serial version UID
-	 */
-	private static final long serialVersionUID = -8038984294071650730L;
-
-	protected TriggerPolicy<IN> triggerPolicy;
-	protected EvictionPolicy<IN> evictionPolicy;
-	private boolean isActiveTrigger;
-	private boolean isActiveEviction;
-	private Thread activePolicyThread;
-	private int bufferSize = 0;
-
-	protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
-
-	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
-		super(null);
-
-		this.triggerPolicy = triggerPolicy;
-		this.evictionPolicy = evictionPolicy;
-
-		this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
-		this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
-	}
-
-	public TriggerPolicy<IN> getTrigger() {
-		return triggerPolicy;
-	}
-
-	public EvictionPolicy<IN> getEviction() {
-		return evictionPolicy;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		// Continuously run
-		while (isRunning && readNext() != null) {
-			processRealElement(nextObject);
-		}
-
-		if (activePolicyThread != null) {
-			activePolicyThread.interrupt();
-		}
-
-		emitWindow();
-
-	}
-
-	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#triggerOnFakeElement(Object)}
-	 * 
-	 * @param input
-	 *            a real input element
-	 * @throws Exception
-	 */
-	protected synchronized void processRealElement(IN input) throws Exception {
-
-		// Setting the input element in order to avoid NullFieldException when triggering on fake element
-		windowEvent.setElement(input);
-		if (isActiveTrigger) {
-			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
-			Object[] result = trigger.preNotifyTrigger(input);
-			for (Object in : result) {
-				triggerOnFakeElement(in);
-			}
-		}
-
-		boolean isTriggered = false;
-
-		if (triggerPolicy.notifyTrigger(input)) {
-			emitWindow();
-			isTriggered = true;
-		}
-
-		evict(input, isTriggered);
-
-		collector.collect(windowEvent.setElement(input));
-		bufferSize++;
-
-	}
-
-	/**
-	 * This method triggers on an arrived fake element The method is
-	 * synchronized to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processRealElement(Object)}
-	 * 
-	 * @param input
-	 *            a fake input element
-	 */
-	@SuppressWarnings("unchecked")
-	protected synchronized void triggerOnFakeElement(Object input) {
-		if (isActiveEviction) {
-			activeEvict(input);
-			emitWindow();
-		} else {
-			emitWindow();
-			evict((IN) input, true);
-		}
-	}
-
-	/**
-	 * This method emits the content of the buffer as a new {@link StreamWindow}
-	 * if not empty
-	 */
-	protected void emitWindow() {
-		collector.collect(windowEvent.setTrigger());
-	}
-
-	private void activeEvict(Object input) {
-		int numToEvict = 0;
-
-		if (isActiveEviction) {
-			ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
-			numToEvict = ep.notifyEvictionWithFakeElement(input, bufferSize);
-		}
-
-		if (numToEvict > 0) {
-			collector.collect(windowEvent.setEviction(numToEvict));
-			bufferSize -= numToEvict;
-			bufferSize = bufferSize >= 0 ? bufferSize : 0;
-		}
-	}
-
-	private void evict(IN input, boolean isTriggered) {
-		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
-
-		if (numToEvict > 0) {
-			collector.collect(windowEvent.setEviction(numToEvict));
-			bufferSize -= numToEvict;
-			bufferSize = bufferSize >= 0 ? bufferSize : 0;
-		}
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		if (isActiveTrigger) {
-			ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
-
-			Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
-			if (runnable != null) {
-				activePolicyThread = new Thread(runnable);
-				activePolicyThread.start();
-			}
-		}
-	}
-
-	/**
-	 * This class allows the active trigger thread to call back and push fake
-	 * elements at any time.
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			triggerOnFakeElement(datapoint);
-		}
-
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof StreamDiscretizer)
-				|| (other instanceof GroupedStreamDiscretizer)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				StreamDiscretizer<IN> otherDiscretizer = (StreamDiscretizer<IN>) other;
-
-				return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
-						&& evictionPolicy.equals(otherDiscretizer.evictionPolicy);
-
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
-				+ evictionPolicy.toString() + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
deleted file mode 100644
index fbd8258..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable manages the window buffers attached to the discretizers.
- */
-public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
-
-	protected WindowBuffer<T> buffer;
-
-	public WindowBufferInvokable(WindowBuffer<T> buffer) {
-		super(null);
-		this.buffer = buffer;
-		withoutInputCopy();
-	}
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		handleWindowEvent(nextObject);
-	}
-
-	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
-			throws Exception {
-		if (windowEvent.isElement()) {
-			buffer.store(windowEvent.getElement());
-		} else if (windowEvent.isEviction()) {
-			buffer.evict(windowEvent.getEviction());
-		} else if (windowEvent.isTrigger()) {
-			buffer.emitWindow(collector);
-		}
-	}
-
-	private void handleWindowEvent(WindowEvent<T> windowEvent) throws Exception {
-		handleWindowEvent(windowEvent, buffer);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
deleted file mode 100644
index 4aff6c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
-
-	public WindowFlattener() {
-		super(null);
-		withoutInputCopy();
-	}
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		for (T element : nextObject) {
-			collector.collect(element);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
deleted file mode 100644
index aa398c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply foldWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-	FoldFunction<IN, OUT> folder;
-
-	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
-		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
-		this.folder = folder;
-		withoutInputCopy();
-	}
-
-	private static class WindowFoldFunction<IN, OUT> implements
-			MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
-
-		private static final long serialVersionUID = 1L;
-		private OUT initialValue;
-		FoldFunction<IN, OUT> folder;
-
-		public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
-			this.folder = folder;
-			this.initialValue = initialValue;
-		}
-
-		@Override
-		public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
-			StreamWindow<OUT> outputWindow = new StreamWindow<OUT>(window.windowID);
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			if (!window.isEmpty()) {
-				OUT accumulator = initialValue;
-				for (int i = 0; i < window.size(); i++) {
-					accumulator = folder.fold(accumulator, window.get(i));
-				}
-				outputWindow.add(accumulator);
-			}
-			return outputWindow;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
deleted file mode 100644
index a065f4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply mapWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	WindowMapFunction<IN, OUT> mapper;
-
-	public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
-		super(new WindowMap<IN, OUT>(mapper));
-		this.mapper = mapper;
-		withoutInputCopy();
-	}
-
-	private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
-
-		private static final long serialVersionUID = 1L;
-		WindowMapFunction<T, R> mapper;
-
-		public WindowMap(WindowMapFunction<T, R> mapper) {
-			this.mapper = mapper;
-		}
-
-		@Override
-		public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
-			StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
-
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			mapper.mapWindow(window, outputWindow);
-
-			return outputWindow;
-		}
-
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
deleted file mode 100644
index 4c112d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable merges together the different partitions of the
- * {@link StreamWindow}s used to merge the results of parallel transformations
- * that belong in the same window.
- */
-public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
-	private Map<Integer, StreamWindow<T>> windows;
-
-	public WindowMerger() {
-		super(null);
-		this.windows = new HashMap<Integer, StreamWindow<T>>();
-		withoutInputCopy();
-	}
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void callUserFunction() throws Exception {
-		StreamWindow<T> nextWindow = nextObject;
-
-		StreamWindow<T> current = windows.get(nextWindow.windowID);
-
-		if (current == null) {
-			current = nextWindow;
-		} else {
-			current = StreamWindow.merge(current, nextWindow);
-		}
-
-		if (current.numberOfParts == 1) {
-			collector.collect(current);
-			windows.remove(nextWindow.windowID);
-		} else {
-			windows.put(nextWindow.windowID, current);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
deleted file mode 100644
index 416b915..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * This FlatMapFunction is used to send the number of parts for each window ID
- * (for each parallel discretizer) to the parallel merger that will use is to
- * merge parallel discretized windows
- */
-public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
-
-	private static final long serialVersionUID = 1L;
-
-	Integer lastIndex = -1;
-
-	@Override
-	public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
-			throws Exception {
-
-		// We dont emit new values for the same index, this avoids sending the
-		// same information for the same partitioned window multiple times
-		if (value.windowID != lastIndex) {
-
-			// For empty windows we send 0 since these windows will be filtered
-			// out
-			if (value.isEmpty()) {
-				out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
-			} else {
-				out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
-			}
-			lastIndex = value.windowID;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
deleted file mode 100644
index 9672b0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable applies either split or key partitioning depending on the
- * transformation.
- */
-public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
-	private KeySelector<T, ?> keySelector;
-	private int numberOfSplits;
-
-	public WindowPartitioner(KeySelector<T, ?> keySelector) {
-		super(null);
-		this.keySelector = keySelector;
-		withoutInputCopy();
-	}
-
-	public WindowPartitioner(int numberOfSplits) {
-		super(null);
-		this.numberOfSplits = numberOfSplits;
-		withoutInputCopy();
-	}
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		StreamWindow<T> currentWindow = nextObject;
-		if (keySelector == null) {
-			if (numberOfSplits <= 1) {
-				collector.collect(currentWindow);
-			} else {
-				for (StreamWindow<T> window : StreamWindow.split(currentWindow, numberOfSplits)) {
-					collector.collect(window);
-				}
-			}
-		} else {
-
-			for (StreamWindow<T> window : StreamWindow
-					.partitionBy(currentWindow, keySelector, true)) {
-				collector.collect(window);
-			}
-
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
deleted file mode 100644
index 67d42b5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply reduceWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWindow<IN>> {
-
-	private static final long serialVersionUID = 1L;
-
-	ReduceFunction<IN> reducer;
-
-	public WindowReducer(ReduceFunction<IN> reducer) {
-		super(new WindowReduceFunction<IN>(reducer));
-		this.reducer = reducer;
-		withoutInputCopy();
-	}
-
-	private static class WindowReduceFunction<T> implements
-			MapFunction<StreamWindow<T>, StreamWindow<T>> {
-
-		private static final long serialVersionUID = 1L;
-		ReduceFunction<T> reducer;
-
-		public WindowReduceFunction(ReduceFunction<T> reducer) {
-			this.reducer = reducer;
-		}
-
-		@Override
-		public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
-			StreamWindow<T> outputWindow = new StreamWindow<T>(window.windowID);
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			if (!window.isEmpty()) {
-				T reduced = window.get(0);
-				for (int i = 1; i < window.size(); i++) {
-					reduced = reducer.reduce(reduced, window.get(i));
-				}
-				outputWindow.add(reduced);
-			}
-			return outputWindow;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
new file mode 100644
index 0000000..cc0790c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+
+public abstract class ChainableStreamOperator<IN, OUT> extends StreamOperator<IN, OUT> implements
+		Collector<IN> {
+
+	private static final long serialVersionUID = 1L;
+	private boolean copyInput = true;
+
+	public ChainableStreamOperator(Function userFunction) {
+		super(userFunction);
+		setChainingStrategy(ChainingStrategy.ALWAYS);
+	}
+
+	public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) {
+		this.collector = collector;
+		this.inSerializer = inSerializer;
+		this.objectSerializer = inSerializer.getObjectSerializer();
+	}
+
+	public ChainableStreamOperator<IN, OUT> withoutInputCopy() {
+		copyInput = false;
+		return this;
+	}
+
+	protected IN copyInput(IN input) {
+		return copyInput ? copy(input) : input;
+	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			nextObject = copyInput(record);
+			callUserFunctionAndLogException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
new file mode 100644
index 0000000..4c997d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+
+public class StreamCounter<IN> extends ChainableStreamOperator<IN, Long> {
+	private static final long serialVersionUID = 1L;
+
+	Long count = 0L;
+
+	public StreamCounter() {
+		super(null);
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			collector.collect(++count);
+		}
+	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			collector.collect(++count);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
new file mode 100644
index 0000000..d2cddf6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	FilterFunction<IN> filterFunction;
+	private boolean collect;
+
+	public StreamFilter(FilterFunction<IN> filterFunction) {
+		super(filterFunction);
+		this.filterFunction = filterFunction;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		collect = filterFunction.filter(nextObject);
+		if (collect) {
+			collector.collect(nextObject);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
new file mode 100644
index 0000000..a17b162
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+
+public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private FlatMapFunction<IN, OUT> flatMapper;
+
+	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
+		super(flatMapper);
+		this.flatMapper = flatMapper;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		flatMapper.flatMap(nextObject, collector);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
new file mode 100644
index 0000000..fc5f187
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected FoldFunction<IN, OUT> folder;
+	private OUT accumulator;
+	protected TypeSerializer<OUT> outTypeSerializer;
+
+	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue,
+			TypeInformation<OUT> outTypeInformation) {
+		super(folder);
+		this.folder = folder;
+		this.accumulator = initialValue;
+		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+
+		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+		collector.collect(accumulator);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
new file mode 100644
index 0000000..303f1b3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN, ?> keySelector;
+	private Map<Object, OUT> values;
+	private OUT initialValue;
+
+	public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+			OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+		super(folder, initialValue, outTypeInformation);
+		this.keySelector = keySelector;
+		this.initialValue = initialValue;
+		values = new HashMap<Object, OUT>();
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		Object key = nextRecord.getKey(keySelector);
+		OUT accumulator = values.get(key);
+		if (accumulator != null) {
+			OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+			values.put(key, folded);
+			collector.collect(folded);
+		} else {
+			OUT first = folder.fold(outTypeSerializer.copy(initialValue), nextObject);
+			values.put(key, first);
+			collector.collect(first);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
new file mode 100644
index 0000000..f5c8f21
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN, ?> keySelector;
+	private Map<Object, IN> values;
+
+	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
+		super(reducer);
+		this.keySelector = keySelector;
+		values = new HashMap<Object, IN>();
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		Object key = keySelector.getKey(nextObject);
+		IN currentValue = values.get(key);
+		if (currentValue != null) {
+			IN reduced = reducer.reduce(copy(currentValue), nextObject);
+			values.put(key, reduced);
+			collector.collect(reduced);
+		} else {
+			values.put(key, nextObject);
+			collector.collect(nextObject);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
new file mode 100644
index 0000000..9f1db1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private MapFunction<IN, OUT> mapper;
+
+	public StreamMap(MapFunction<IN, OUT> mapper) {
+		super(mapper);
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		collector.collect(mapper.map(nextObject));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
new file mode 100644
index 0000000..7ec0b0b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The StreamOperator represents the base class for all operators in the
+ * streaming topology.
+ * 
+ * @param <OUT>
+ *            The output type of the operator
+ */
+public abstract class StreamOperator<IN, OUT> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(StreamOperator.class);
+
+	protected StreamTaskContext<OUT> taskContext;
+
+	protected ExecutionConfig executionConfig = null;
+
+	protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
+	protected StreamRecordSerializer<IN> inSerializer;
+	protected TypeSerializer<IN> objectSerializer;
+	protected StreamRecord<IN> nextRecord;
+	protected IN nextObject;
+	protected boolean isMutable;
+
+	public Collector<OUT> collector;
+	protected Function userFunction;
+	protected volatile boolean isRunning;
+
+	private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+
+	public StreamOperator(Function userFunction) {
+		this.userFunction = userFunction;
+	}
+
+	/**
+	 * Initializes the {@link StreamOperator} for input and output handling
+	 * 
+	 * @param taskContext
+	 *            StreamTaskContext representing the vertex
+	 */
+	public void setup(StreamTaskContext<OUT> taskContext) {
+		this.collector = taskContext.getOutputCollector();
+		this.recordIterator = taskContext.getIndexedInput(0);
+		this.inSerializer = taskContext.getInputSerializer(0);
+		if (this.inSerializer != null) {
+			this.nextRecord = inSerializer.createInstance();
+			this.objectSerializer = inSerializer.getObjectSerializer();
+		}
+		this.taskContext = taskContext;
+		this.executionConfig = taskContext.getExecutionConfig();
+	}
+
+	/**
+	 * Method that will be called when the operator starts, should encode the
+	 * processing logic
+	 */
+	public abstract void run() throws Exception;
+
+	/*
+	 * Reads the next record from the reader iterator and stores it in the
+	 * nextRecord variable
+	 */
+	protected StreamRecord<IN> readNext() throws IOException {
+		this.nextRecord = inSerializer.createInstance();
+		try {
+			nextRecord = recordIterator.next(nextRecord);
+			try {
+				nextObject = nextRecord.getObject();
+			} catch (NullPointerException e) {
+				// end of stream
+			}
+			return nextRecord;
+		} catch (IOException e) {
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
+		}  catch (IllegalStateException e) {
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
+		}
+	}
+
+	/**
+	 * The call of the user implemented function should be implemented here
+	 */
+	protected void callUserFunction() throws Exception {
+	}
+
+	/**
+	 * Method for logging exceptions thrown during the user function call
+	 */
+	protected void callUserFunctionAndLogException() {
+		try {
+			callUserFunction();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
+			}
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Open method to be used if the user defined function extends the
+	 * RichFunction class
+	 * 
+	 * @param parameters
+	 *            The configuration parameters for the operator
+	 */
+	public void open(Configuration parameters) throws Exception {
+		isRunning = true;
+		FunctionUtils.openFunction(userFunction, parameters);
+	}
+
+	/**
+	 * Close method to be used if the user defined function extends the
+	 * RichFunction class
+	 * 
+	 */
+	public void close() {
+		isRunning = false;
+		collector.close();
+		try {
+			FunctionUtils.closeFunction(userFunction);
+		} catch (Exception e) {
+			throw new RuntimeException("Error when closing the function: " + e.getMessage());
+		}
+	}
+
+	public void cancel() {
+		isRunning = false;
+	}
+
+	public void setRuntimeContext(RuntimeContext t) {
+		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
+	}
+
+	protected IN copy(IN record) {
+		return objectSerializer.copy(record);
+	}
+
+	public void setChainingStrategy(ChainingStrategy strategy) {
+		if (strategy == ChainingStrategy.ALWAYS) {
+			if (!(this instanceof ChainableStreamOperator)) {
+				throw new RuntimeException(
+						"Operator needs to extend ChainableOperator to be chained");
+			}
+		}
+		this.chainingStrategy = strategy;
+	}
+
+	public ChainingStrategy getChainingStrategy() {
+		return chainingStrategy;
+	}
+
+	public static enum ChainingStrategy {
+		ALWAYS, NEVER, HEAD;
+	}
+
+	public Function getUserFunction() {
+		return userFunction;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
new file mode 100644
index 0000000..7f8b10d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+public class StreamProject<IN, OUT extends Tuple> extends ChainableStreamOperator<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	transient OUT outTuple;
+	TypeSerializer<OUT> outTypeSerializer;
+	TypeInformation<OUT> outTypeInformation;
+	int[] fields;
+	int numFields;
+
+	public StreamProject(int[] fields, TypeInformation<OUT> outTypeInformation) {
+		super(null);
+		this.fields = fields;
+		this.numFields = this.fields.length;
+		this.outTypeInformation = outTypeInformation;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		for (int i = 0; i < this.numFields; i++) {
+			outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
+		}
+		collector.collect(outTuple);
+	}
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+		outTuple = outTypeSerializer.createInstance();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
new file mode 100644
index 0000000..179d690
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
+	private static final long serialVersionUID = 1L;
+
+	protected ReduceFunction<IN> reducer;
+	private IN currentValue;
+
+	public StreamReduce(ReduceFunction<IN> reducer) {
+		super(reducer);
+		this.reducer = reducer;
+		currentValue = null;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+
+		if (currentValue != null) {
+			currentValue = reducer.reduce(copy(currentValue), nextObject);
+		} else {
+			currentValue = nextObject;
+
+		}
+		collector.collect(currentValue);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
new file mode 100644
index 0000000..d1f93d1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
+	private static final long serialVersionUID = 1L;
+
+	private SinkFunction<IN> sinkFunction;
+
+	public StreamSink(SinkFunction<IN> sinkFunction) {
+		super(sinkFunction);
+		this.sinkFunction = sinkFunction;
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning && readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		sinkFunction.invoke(nextObject);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
new file mode 100644
index 0000000..8c834f5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private SourceFunction<OUT> sourceFunction;
+
+	public StreamSource(SourceFunction<OUT> sourceFunction) {
+		super(sourceFunction);
+		this.sourceFunction = sourceFunction;
+	}
+
+	@Override
+	public void run() {
+		callUserFunctionAndLogException();
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		sourceFunction.run(collector);
+	}
+
+	@Override
+	public void cancel() {
+		super.cancel();
+		sourceFunction.cancel();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
new file mode 100644
index 0000000..004a17a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+
+public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
+
+	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+		this.flatMapper = flatMapper;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		callUserFunctionAndLogException1();
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		callUserFunctionAndLogException2();
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		flatMapper.flatMap1(reuse1.getObject(), collector);
+
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		flatMapper.flatMap2(reuse2.getObject(), collector);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
new file mode 100644
index 0000000..2ed3b2e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+
+public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected KeySelector<IN1, ?> keySelector1;
+	protected KeySelector<IN2, ?> keySelector2;
+	private Map<Object, IN1> values1;
+	private Map<Object, IN2> values2;
+	IN1 reduced1;
+	IN2 reduced2;
+
+	public CoStreamGroupedReduce(CoReduceFunction<IN1, IN2, OUT> coReducer,
+			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		super(coReducer);
+		this.coReducer = coReducer;
+		this.keySelector1 = keySelector1;
+		this.keySelector2 = keySelector2;
+		values1 = new HashMap<Object, IN1>();
+		values2 = new HashMap<Object, IN2>();
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		Object key = reuse1.getKey(keySelector1);
+		currentValue1 = values1.get(key);
+		nextValue1 = reuse1.getObject();
+		if (currentValue1 != null) {
+			callUserFunctionAndLogException1();
+			values1.put(key, reduced1);
+			collector.collect(coReducer.map1(reduced1));
+		} else {
+			values1.put(key, nextValue1);
+			collector.collect(coReducer.map1(nextValue1));
+		}
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		Object key = reuse2.getKey(keySelector2);
+		currentValue2 = values2.get(key);
+		nextValue2 = reuse2.getObject();
+		if (currentValue2 != null) {
+			callUserFunctionAndLogException2();
+			values2.put(key, reduced2);
+			collector.collect(coReducer.map2(reduced2));
+		} else {
+			values2.put(key, nextValue2);
+			collector.collect(coReducer.map2(nextValue2));
+		}
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
new file mode 100644
index 0000000..932438b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+
+public class CoStreamMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CoMapFunction<IN1, IN2, OUT> mapper;
+
+	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
+		super(mapper);
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		callUserFunctionAndLogException1();
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		callUserFunctionAndLogException2();
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		collector.collect(mapper.map1(reuse1.getObject()));
+
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		collector.collect(mapper.map2(reuse2.getObject()));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
new file mode 100644
index 0000000..214cb17
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CoStreamOperator<IN1, IN2, OUT> extends StreamOperator<IN1, OUT> {
+
+	public CoStreamOperator(Function userFunction) {
+		super(userFunction);
+	}
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(CoStreamOperator.class);
+
+	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
+	protected StreamRecord<IN1> reuse1;
+	protected StreamRecord<IN2> reuse2;
+	protected StreamRecordSerializer<IN1> srSerializer1;
+	protected StreamRecordSerializer<IN2> srSerializer2;
+	protected TypeSerializer<IN1> serializer1;
+	protected TypeSerializer<IN2> serializer2;
+
+	@Override
+	public void setup(StreamTaskContext<OUT> taskContext) {
+		this.collector = taskContext.getOutputCollector();
+
+		this.recordIterator = taskContext.getCoReader();
+
+		this.srSerializer1 = taskContext.getInputSerializer(0);
+		this.srSerializer2 = taskContext.getInputSerializer(1);
+
+		this.reuse1 = srSerializer1.createInstance();
+		this.reuse2 = srSerializer2.createInstance();
+
+		this.serializer1 = srSerializer1.getObjectSerializer();
+		this.serializer2 = srSerializer2.getObjectSerializer();
+	}
+
+	protected void resetReuseAll() {
+		this.reuse1 = srSerializer1.createInstance();
+		this.reuse2 = srSerializer2.createInstance();
+	}
+
+	protected void resetReuse1() {
+		this.reuse1 = srSerializer1.createInstance();
+	}
+
+	protected void resetReuse2() {
+		this.reuse2 = srSerializer2.createInstance();
+	}
+
+	@Override
+	public void run() throws Exception {
+		while (isRunning) {
+			int next;
+			try {
+				next = recordIterator.next(reuse1, reuse2);
+			} catch (IOException e) {
+				if (isRunning) {
+					throw new RuntimeException("Could not read next record.", e);
+				} else {
+					// Task already cancelled do nothing
+					next = 0;
+				}
+			} catch (IllegalStateException e) {
+				if (isRunning) {
+					throw new RuntimeException("Could not read next record.", e);
+				} else {
+					// Task already cancelled do nothing
+					next = 0;
+				}
+			}
+
+			if (next == 0) {
+				break;
+			} else if (next == 1) {
+				initialize1();
+				handleStream1();
+				resetReuse1();
+			} else {
+				initialize2();
+				handleStream2();
+				resetReuse2();
+			}
+		}
+	}
+
+	protected abstract void handleStream1() throws Exception;
+
+	protected abstract void handleStream2() throws Exception;
+
+	protected abstract void callUserFunction1() throws Exception;
+
+	protected abstract void callUserFunction2() throws Exception;
+
+	protected void initialize1() {
+
+	};
+
+	protected void initialize2() {
+
+	};
+
+	protected void callUserFunctionAndLogException1() {
+		try {
+			callUserFunction1();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
+			}
+			throw new RuntimeException(e);
+		}
+	}
+
+	protected void callUserFunctionAndLogException2() {
+		try {
+			callUserFunction2();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
+			}
+			throw new RuntimeException(e);
+		}
+	}
+
+}