You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:46 UTC

[03/18] git commit: [streaming] Updated directed emit to not use output names when all outputs are selected

[streaming] Updated directed emit to not use output names when all outputs are selected


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fbfcc9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fbfcc9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fbfcc9eb

Branch: refs/heads/master
Commit: fbfcc9eb4d835df1a646b6f963527e86ef380ec9
Parents: 47dca69
Author: ghermann <re...@gmail.com>
Authored: Tue Sep 2 14:47:48 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |   9 +-
 .../flink/streaming/api/StreamConfig.java       |  56 ++--
 .../api/collector/DirectedStreamCollector.java  | 224 ++++++++-------
 .../api/collector/StreamCollector.java          | 285 ++++++++++---------
 .../streaming/api/datastream/DataStream.java    |   7 +-
 .../api/datastream/IterativeDataStream.java     |   4 +-
 .../datastream/SingleOutputStreamOperator.java  |  18 +-
 .../api/datastream/SplitDataStream.java         |  20 +-
 .../api/streamcomponent/OutputHandler.java      |   5 +-
 .../api/collector/DirectedOutputTest.java       | 120 ++++----
 .../api/collector/StreamCollectorTest.java      |   2 +-
 11 files changed, 378 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index aaa7161..7973324 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -64,6 +64,7 @@ public class JobGraphBuilder {
 	private Map<String, List<String>> outEdgeList;
 	private Map<String, List<Integer>> outEdgeType;
 	private Map<String, List<List<String>>> outEdgeNames;
+	private Map<String, List<Boolean>> outEdgeSelectAll;
 	private Map<String, Boolean> mutability;
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
@@ -106,6 +107,7 @@ public class JobGraphBuilder {
 		outEdgeList = new HashMap<String, List<String>>();
 		outEdgeType = new HashMap<String, List<Integer>>();
 		outEdgeNames = new HashMap<String, List<List<String>>>();
+		outEdgeSelectAll = new HashMap<String, List<Boolean>>();
 		mutability = new HashMap<String, Boolean>();
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
@@ -203,7 +205,7 @@ public class JobGraphBuilder {
 
 		setEdge(componentName, iterationHead,
 				connectionTypes.get(inEdgeList.get(iterationHead).get(0))
-						.get(0), 0, new ArrayList<String>());
+						.get(0), 0, new ArrayList<String>(), false);
 
 		iterationWaitTime.put(iterationIDtoSourceName.get(iterationID),
 				waitTime);
@@ -360,6 +362,7 @@ public class JobGraphBuilder {
 		outEdgeList.put(componentName, new ArrayList<String>());
 		outEdgeType.put(componentName, new ArrayList<Integer>());
 		outEdgeNames.put(componentName, new ArrayList<List<String>>());
+		outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
 		inEdgeList.put(componentName, new ArrayList<String>());
 		connectionTypes.put(componentName,
 				new ArrayList<StreamPartitioner<?>>());
@@ -484,12 +487,13 @@ public class JobGraphBuilder {
 	public void setEdge(String upStreamComponentName,
 			String downStreamComponentName,
 			StreamPartitioner<?> partitionerObject, int typeNumber,
-			List<String> outputNames) {
+			List<String> outputNames, boolean selectAll) {
 		outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
 		outEdgeType.get(upStreamComponentName).add(typeNumber);
 		inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
 		connectionTypes.get(upStreamComponentName).add(partitionerObject);
 		outEdgeNames.get(upStreamComponentName).add(outputNames);
+		outEdgeSelectAll.get(upStreamComponentName).add(selectAll);
 	}
 
 	/**
@@ -541,6 +545,7 @@ public class JobGraphBuilder {
 
 		config.setOutputName(outputIndex,
 				outEdgeNames.get(upStreamComponentName).get(outputIndex));
+		config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
 		config.setPartitioner(outputIndex, partitionerObject);
 		config.setNumberOfOutputChannels(outputIndex,
 				componentParallelism.get(downStreamComponentName));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 445020a..c2a4c21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -38,6 +38,7 @@ public class StreamConfig {
 	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
 	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
 	private static final String OUTPUT_NAME = "outputName_";
+	private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
 	private static final String PARTITIONER_OBJECT = "partitionerObject_";
 	private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
 	private static final String ITERATION_ID = "iteration-id";
@@ -149,8 +150,7 @@ public class StreamConfig {
 			config.setClass(USER_FUNCTION, invokableObject.getClass());
 
 			try {
-				config.setBytes(SERIALIZEDUDF,
-						SerializationUtils.serialize(invokableObject));
+				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
 			} catch (SerializationException e) {
 				throw new RuntimeException("Cannot serialize invokable object "
 						+ invokableObject.getClass(), e);
@@ -162,11 +162,10 @@ public class StreamConfig {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
 		} catch (Exception e) {
-			throw new StreamComponentException(
-					"Cannot instantiate user function", e);
+			throw new StreamComponentException("Cannot instantiate user function", e);
 		}
 	}
-	
+
 	public void setComponentName(String componentName) {
 		config.setString(COMPONENT_NAME, componentName);
 	}
@@ -184,8 +183,7 @@ public class StreamConfig {
 
 	public Object getFunction() {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(FUNCTION,
-					null));
+			return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot deserialize invokable object", e);
 		}
@@ -214,8 +212,8 @@ public class StreamConfig {
 		try {
 			return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
 		} catch (Exception e) {
-			throw new StreamComponentException(
-					"Cannot deserialize and instantiate OutputSelector", e);
+			throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+					e);
 		}
 	}
 
@@ -235,30 +233,36 @@ public class StreamConfig {
 		return config.getLong(ITERATON_WAIT, 0);
 	}
 
-	public void setNumberOfOutputChannels(int outputIndex,
-			Integer numberOfOutputChannels) {
-		config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex,
-				numberOfOutputChannels);
+	public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
+		config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
 	}
 
 	public int getNumberOfOutputChannels(int outputIndex) {
 		return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
 	}
 
-	public <T> void setPartitioner(int outputIndex,
-			StreamPartitioner<T> partitionerObject) {
+	public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> partitionerObject) {
 
 		config.setBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T> StreamPartitioner<T> getPartitioner(int outputIndex)
-			throws ClassNotFoundException, IOException {
-		return deserializeObject(config.getBytes(PARTITIONER_OBJECT
-				+ outputIndex,
+	public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
+			IOException {
+		return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(new ShufflePartitioner<T>())));
 	}
 
+	public void setSelectAll(int outputIndex, Boolean selectAll) {
+		if (selectAll != null) {
+			config.setBoolean(OUTPUT_SELECT_ALL + outputIndex, selectAll);
+		}
+	}
+
+	public boolean getSelectAll(int outputIndex) {
+		return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, false);
+	}
+
 	public void setOutputName(int outputIndex, List<String> outputName) {
 		if (outputName != null) {
 			config.setBytes(OUTPUT_NAME + outputIndex,
@@ -268,8 +272,8 @@ public class StreamConfig {
 
 	@SuppressWarnings("unchecked")
 	public List<String> getOutputName(int outputIndex) {
-		return (List<String>) SerializationUtils.deserialize(config.getBytes(
-				OUTPUT_NAME + outputIndex, null));
+		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
+				+ outputIndex, null));
 	}
 
 	public void setNumberOfInputs(int numberOfInputs) {
@@ -296,20 +300,18 @@ public class StreamConfig {
 		return config.getInteger(INPUT_TYPE + inputNumber, 0);
 	}
 
-	public void setFunctionClass(
-			Class<? extends AbstractRichFunction> functionClass) {
+	public void setFunctionClass(Class<? extends AbstractRichFunction> functionClass) {
 		config.setClass("functionClass", functionClass);
 	}
 
 	@SuppressWarnings("unchecked")
 	public Class<? extends AbstractRichFunction> getFunctionClass() {
-		return (Class<? extends AbstractRichFunction>) config.getClass(
-				"functionClass", null);
+		return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
 	}
 
 	@SuppressWarnings("unchecked")
-	protected static <T> T deserializeObject(byte[] serializedObject)
-			throws IOException, ClassNotFoundException {
+	protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+			ClassNotFoundException {
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 54b1a98..ab6caea 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -1,105 +1,119 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- * 
- * @param <OUT>
- *            Type of the Tuple collected.
- */
-public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
-
-	OutputSelector<OUT> outputSelector;
-	private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
-	private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
-
-	/**
-	 * Creates a new DirectedStreamCollector
-	 * 
-	 * @param channelID
-	 *            Channel ID of the Task
-	 * @param serializationDelegate
-	 *            Serialization delegate used for serialization
-	 * @param outputSelector
-	 *            User defined {@link OutputSelector}
-	 */
-	public DirectedStreamCollector(int channelID,
-			SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
-			OutputSelector<OUT> outputSelector) {
-		super(channelID, serializationDelegate);
-		this.outputSelector = outputSelector;
-		this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-
-	}
-
-	/**
-	 * Collects and emits a tuple to the outputs by reusing a StreamRecord
-	 * object.
-	 * 
-	 * @param outputObject
-	 *            Object to be collected and emitted.
-	 */
-	@Override
-	public void collect(OUT outputObject) {
-		streamRecord.setObject(outputObject);
-		emit(streamRecord);
-	}
-
-	/**
-	 * Emits a StreamRecord to the outputs selected by the user defined
-	 * OutputSelector
-	 * 
-	 * @param streamRecord
-	 *            Record to emit.
-	 */
-	private void emit(StreamRecord<OUT> streamRecord) {
-		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
-		streamRecord.newId(channelID);
-		serializationDelegate.setInstance(streamRecord);
-		emitted.clear();
-		for (String outputName : outputNames) {
-			try {
-				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputMap
-						.get(outputName)) {
-					if (!emitted.contains(output)) {
-						output.emit(serializationDelegate);
-						emitted.add(output);
-					}
-				}
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error(String.format("Emit to %s failed due to: %s", outputName,
-							StringUtils.stringifyException(e)));
-				}
-			}
-		}
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StreamCollector that uses user defined output names and a user defined
+ * output selector to make directed emits.
+ * 
+ * @param <OUT>
+ *            Type of the Tuple collected.
+ */
+public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
+	
+	OutputSelector<OUT> outputSelector;
+	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs;
+	private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
+
+	/**
+	 * Creates a new DirectedStreamCollector
+	 * 
+	 * @param channelID
+	 *            Channel ID of the Task
+	 * @param serializationDelegate
+	 *            Serialization delegate used for serialization
+	 * @param outputSelector
+	 *            User defined {@link OutputSelector}
+	 */
+	public DirectedStreamCollector(int channelID,
+			SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
+			OutputSelector<OUT> outputSelector) {
+		super(channelID, serializationDelegate);
+		this.outputSelector = outputSelector;
+		this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.selectAllOutputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+	}
+
+	@Override
+	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+			List<String> outputNames, boolean isSelectAllOutput) {
+
+		if (isSelectAllOutput) {
+			selectAllOutputs.add(output);
+		} else {
+			addOneOutput(output, outputNames, isSelectAllOutput);
+		}
+	}
+
+	/**
+	 * Emits a StreamRecord to the outputs selected by the user defined
+	 * OutputSelector
+	 *
+	 */
+	protected void emitToOutputs() {
+		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
+		emitted.clear();
+		for (String outputName : outputNames) {
+			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap
+					.get(outputName);
+			if (outputList == null) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error(String.format(
+							"Cannot emit because no output is selected with the name: %s",
+							outputName));
+				}
+			}
+
+			try {
+				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
+					if (!emitted.contains(output)) {
+						output.emit(serializationDelegate);
+						emitted.add(output);
+					}
+				}
+
+				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) {
+					if (!emitted.contains(output)) {
+						output.emit(serializationDelegate);
+						emitted.add(output);
+					}
+				}
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error(String.format("Emit to %s failed due to: %s", outputName,
+							StringUtils.stringifyException(e)));
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index c6ba1ef..ce4069e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -1,137 +1,148 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Collector for tuples in Apache Flink stream processing. The collected values
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
- * 
- * @param <OUT>
- *            Type of the Tuples/Objects collected.
- */
-public class StreamCollector<OUT> implements Collector<OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
-
-	protected StreamRecord<OUT> streamRecord;
-	protected int channelID;
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
-	protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
-	protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
-
-	/**
-	 * Creates a new StreamCollector
-	 * 
-	 * @param channelID
-	 *            Channel ID of the Task
-	 * @param serializationDelegate
-	 *            Serialization delegate used for serialization
-	 */
-	public StreamCollector(int channelID,
-			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
-		this.serializationDelegate = serializationDelegate;
-		if (serializationDelegate != null) {
-			this.streamRecord = serializationDelegate.getInstance();
-		} else {
-			this.streamRecord = new StreamRecord<OUT>();
-		}
-		this.channelID = channelID;
-		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
-	}
-
-	/**
-	 * Adds an output with the given user defined name
-	 * 
-	 * @param output
-	 *            The RecordWriter object representing the output.
-	 * @param outputNames
-	 *            User defined names of the output.
-	 */
-	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-			List<String> outputNames) {
-		outputs.add(output);
-		for (String outputName : outputNames) {
-			if (outputName != null) {
-				if (!outputMap.containsKey(outputName)) {
-					outputMap
-							.put(outputName,
-									new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
-					outputMap.get(outputName).add(output);
-				} else {
-					if (!outputMap.get(outputName).contains(output)) {
-						outputMap.get(outputName).add(output);
-					}
-				}
-
-			}
-		}
-	}
-
-	/**
-	 * Collects and emits a tuple/object to the outputs by reusing a
-	 * StreamRecord object.
-	 * 
-	 * @param outputObject
-	 *            Object to be collected and emitted.
-	 */
-	@Override
-	public void collect(OUT outputObject) {
-		streamRecord.setObject(outputObject);
-		emit(streamRecord);
-	}
-
-	/**
-	 * Emits a StreamRecord to all the outputs.
-	 * 
-	 * @param streamRecord
-	 *            StreamRecord to emit.
-	 */
-	private void emit(StreamRecord<OUT> streamRecord) {
-		streamRecord.newId(channelID);
-		serializationDelegate.setInstance(streamRecord);
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			try {
-				output.emit(serializationDelegate);
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error(String.format("Emit failed due to: %s",
-							StringUtils.stringifyException(e)));
-				}
-			}
-		}
-	}
-
-	@Override
-	public void close() {
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collector for tuples in Apache Flink stream processing. The collected values
+ * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
+ * outputs.
+ * 
+ * @param <OUT>
+ *            Type of the Tuples/Objects collected.
+ */
+public class StreamCollector<OUT> implements Collector<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
+
+	protected StreamRecord<OUT> streamRecord;
+	protected int channelID;
+	protected List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+	protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
+	protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+
+	/**
+	 * Creates a new StreamCollector
+	 * 
+	 * @param channelID
+	 *            Channel ID of the Task
+	 * @param serializationDelegate
+	 *            Serialization delegate used for serialization
+	 */
+	public StreamCollector(int channelID,
+			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
+		this.serializationDelegate = serializationDelegate;
+		if (serializationDelegate != null) {
+			this.streamRecord = serializationDelegate.getInstance();
+		} else {
+			this.streamRecord = new StreamRecord<OUT>();
+		}
+		this.channelID = channelID;
+		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
+	}
+
+	/**
+	 * Adds an output with the given user defined name
+	 * 
+	 * @param output
+	 *            The RecordWriter object representing the output.
+	 * @param outputNames
+	 *            User defined names of the output.
+	 * @param isSelectAllOutput
+	 *            Marks whether all the outputs are selected.
+	 */
+	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+			List<String> outputNames, boolean isSelectAllOutput) {
+		addOneOutput(output, outputNames, isSelectAllOutput);
+	}
+
+	protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+			List<String> outputNames, boolean isSelectAllOutput) {
+		outputs.add(output);
+		for (String outputName : outputNames) {
+			if (outputName != null) {
+				if (!outputMap.containsKey(outputName)) {
+					outputMap
+							.put(outputName,
+									new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
+					outputMap.get(outputName).add(output);
+				} else {
+					if (!outputMap.get(outputName).contains(output)) {
+						outputMap.get(outputName).add(output);
+					}
+				}
+
+			}
+		}
+	}
+	
+	/**
+	 * Collects and emits a tuple/object to the outputs by reusing a
+	 * StreamRecord object.
+	 * 
+	 * @param outputObject
+	 *            Object to be collected and emitted.
+	 */
+	@Override
+	public void collect(OUT outputObject) {
+		streamRecord.setObject(outputObject);
+		emit(streamRecord);
+	}
+
+	/**
+	 * Emits a StreamRecord to the outputs.
+	 * 
+	 * @param streamRecord
+	 *            StreamRecord to emit.
+	 */
+	private void emit(StreamRecord<OUT> streamRecord) {
+		streamRecord.newId(channelID);
+		serializationDelegate.setInstance(streamRecord);
+		emitToOutputs();
+	}
+	
+	protected void emitToOutputs() {
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			try {
+				output.emit(serializationDelegate);
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error(String.format("Emit failed due to: %s",
+							StringUtils.stringifyException(e)));
+				}
+			}
+		}
+	}
+
+	@Override
+	public void close() {
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index a2994dc..64a07b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -82,6 +82,7 @@ public abstract class DataStream<OUT> {
 	protected final String id;
 	protected int degreeOfParallelism;
 	protected List<String> userDefinedNames;
+	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -106,6 +107,7 @@ public abstract class DataStream<OUT> {
 		this.degreeOfParallelism = environment.getDegreeOfParallelism();
 		this.jobGraphBuilder = environment.getJobGraphBuilder();
 		this.userDefinedNames = new ArrayList<String>();
+		this.selectAll = false;
 		this.partitioner = new ForwardPartitioner<OUT>();
 
 	}
@@ -121,6 +123,7 @@ public abstract class DataStream<OUT> {
 		this.id = dataStream.id;
 		this.degreeOfParallelism = dataStream.degreeOfParallelism;
 		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
+		this.selectAll = dataStream.selectAll;
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 
@@ -911,11 +914,11 @@ public abstract class DataStream<OUT> {
 		if (inputStream instanceof MergedDataStream) {
 			for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
 				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
-						inputStream.userDefinedNames);
+						inputStream.userDefinedNames, inputStream.selectAll);
 			}
 		} else {
 			jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
-					typeNumber, inputStream.userDefinedNames);
+					typeNumber, inputStream.userDefinedNames, inputStream.selectAll);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 46be328..16362ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -93,12 +93,12 @@ public class IterativeDataStream<IN> extends
 			for (DataStream<IN> stream : ((MergedDataStream<IN>) iterationTail).mergedStreams) {
 				String inputID = stream.getId();
 				jobGraphBuilder.setEdge(inputID, returnStream.getId(),
-						new ForwardPartitioner<IN>(), 0, name);
+						new ForwardPartitioner<IN>(), 0, name, false);
 			}
 		} else {
 
 			jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
-					new ForwardPartitioner<IN>(), 0, name);
+					new ForwardPartitioner<IN>(), 0, name, false);
 		}
 
 		return iterationTail;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b1b0939..1f01feb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -104,22 +104,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * @return The {@link SplitDataStream}
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		return split(outputSelector, null);
-	}
-
-	/**
-	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link OutputSelector}. Calling this method on an operator creates a new
-	 * {@link SplitDataStream}.
-	 * 
-	 * @param outputSelector
-	 *            The user defined {@link OutputSelector} for directing the
-	 *            tuples.
-	 * @param outputNames
-	 *            An array of all the output names to be used for selectAll
-	 * @return The {@link SplitDataStream}
-	 */
-	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector, String[] outputNames) {
 		try {
 			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
 
@@ -127,7 +111,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 			throw new RuntimeException("Cannot serialize OutputSelector");
 		}
 
-		return new SplitDataStream<OUT>(this, outputNames);
+		return new SplitDataStream<OUT>(this);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index a6cf4b1..0ddb4f0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -32,11 +32,9 @@ import org.apache.flink.streaming.api.collector.OutputSelector;
 public class SplitDataStream<OUT> {
 
 	DataStream<OUT> dataStream;
-	String[] allNames;
 
-	protected SplitDataStream(DataStream<OUT> dataStream, String[] outputNames) {
+	protected SplitDataStream(DataStream<OUT> dataStream) {
 		this.dataStream = dataStream.copy();
-		this.allNames = outputNames;
 	}
 
 	/**
@@ -52,22 +50,18 @@ public class SplitDataStream<OUT> {
 	}
 
 	/**
-	 * Selects all output names from a split data stream. Output names must
-	 * predefined to use selectAll.
+	 * Selects all output names from a split data stream.
 	 * 
 	 * @return Returns the selected DataStream
 	 */
-	public DataStream<OUT> selectAll() {
-		if (allNames != null) {
-			return selectOutput(allNames);
-		} else {
-			throw new RuntimeException(
-					"Output names must be predefined in order to use select all.");
-		}
+	public DataStream<OUT> selectAll() {
+		DataStream<OUT> returnStream = dataStream.copy();
+		returnStream.selectAll = true;
+		return returnStream;
 	}
 
 	private DataStream<OUT> selectOutput(String[] outputName) {
-		DataStream<OUT> returnStream = dataStream.copy();
+		DataStream<OUT> returnStream = dataStream.copy();
 		returnStream.userDefinedNames = Arrays.asList(outputName);
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index e19eeaa..76277dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -127,9 +127,10 @@ public class OutputHandler<OUT> {
 
 		outputs.add(output);
 		List<String> outputName = configuration.getOutputName(outputNumber);
-
+		boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
+		
 		if (collector != null) {
-			collector.addOutput(output, outputName);
+			collector.addOutput(output, outputName, isSelectAllOutput);
 		}
 
 		if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 1608b7b..fdf9db3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -1,30 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,74 +20,79 @@ import org.junit.Test;
 
 public class DirectedOutputTest {
 
-	static HashSet<Long> evenSet = new HashSet<Long>();
-	static HashSet<Long> oddSet = new HashSet<Long>();
-
-	private static class PlusTwo extends RichMapFunction<Long, Long> {
+	private static final String TEN = "ten";
+	private static final String ODD = "odd";
+	private static final String ALL = "all";
+	private static final String EVEN_AND_ODD = "evenAndOdd";
+	private static final String ODD_AND_TEN = "oddAndTen";
+	private static final String EVEN = "even";
 
+	static final class MyMap implements MapFunction<Long, Long> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Long map(Long arg0) throws Exception {
-			arg0 += 2;
-			return arg0;
+		public Long map(Long value) throws Exception {
+			return value;
 		}
 	}
 
-	private static class EvenSink implements SinkFunction<Long> {
-
+	static final class MyOutputSelector extends OutputSelector<Long> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Long tuple) {
-			evenSet.add(tuple);
+		public void select(Long value, Collection<String> outputs) {
+			if (value % 2 == 0) {
+				outputs.add(EVEN);
+			} else {
+				outputs.add(ODD);
+			}
+			
+			if (value == 10L) {
+				outputs.add(TEN);
+			}
 		}
 	}
+	
+	static final class ListSink implements SinkFunction<Long> {
+		private static final long serialVersionUID = 1L;
 
-	private static class OddSink implements SinkFunction<Long> {
+		private String name;
+		private transient List<Long> list;
 
-		private static final long serialVersionUID = 1L;
+		public ListSink(String name) {
+			this.name = name;
+		}
 
 		@Override
-		public void invoke(Long tuple) {
-			oddSet.add(tuple);
+		public void invoke(Long value) {
+			list.add(value);
 		}
-	}
-
-	private static class MySelector extends OutputSelector<Long> {
-
-		private static final long serialVersionUID = 1L;
 
-		@Override
-		public void select(Long tuple, Collection<String> outputs) {
-			int mod = (int) (tuple % 2);
-			switch (mod) {
-			case 0:
-				outputs.add("ds1");
-				break;
-			case 1:
-				outputs.add("ds2");
-				break;
-			}
+		private void readObject(java.io.ObjectInputStream in) throws IOException,
+				ClassNotFoundException {
+			in.defaultReadObject();
+			outputs.put(name, new ArrayList<Long>());
+			this.list = outputs.get(name);
 		}
 	}
 
-	@SuppressWarnings("unused")
+	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
+	
 	@Test
-	public void directOutputTest() throws Exception {
+	public void outputSelectorTest() throws Exception {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector(),
-				new String[] { "ds1", "ds2" });
-		DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
-		DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
-
-		env.executeTest(32);
-
-		HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
-		HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
 
-		assertEquals(expectedEven, evenSet);
-		assertEquals(expectedOdd, oddSet);
+		SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
+		source.select(EVEN).addSink(new ListSink(EVEN));
+		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
+		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
+		source.selectAll().addSink(new ListSink(ALL));
+		
+		env.executeTest(128);
+		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
+		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L), outputs.get(ODD_AND_TEN));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(EVEN_AND_ODD));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(ALL));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 05d7494..66234d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -38,7 +38,7 @@ public class StreamCollectorTest {
 		sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
 
 		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, sd);
-		collector.addOutput(recWriter, new ArrayList<String>());
+		collector.addOutput(recWriter, new ArrayList<String>(), false);
 		collector.collect(new Tuple1<Integer>(3));
 		collector.collect(new Tuple1<Integer>(4));
 		collector.collect(new Tuple1<Integer>(5));