You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:37 UTC

[05/10] flink git commit: [FLINK-1594] [streaming] Added OutputSelector wrapping

[FLINK-1594] [streaming] Added OutputSelector wrapping


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d832400b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d832400b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d832400b

Branch: refs/heads/master
Commit: d832400b3fee6c73553492e5954587958b4f7137
Parents: 3158d1d
Author: Gábor Hermann <re...@gmail.com>
Authored: Mon Mar 2 10:20:14 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  32 ++---
 .../apache/flink/streaming/api/StreamGraph.java |   8 +-
 .../api/StreamingJobGraphGenerator.java         |   2 +-
 .../api/collector/CollectorWrapper.java         |  17 ++-
 .../api/collector/DirectedCollectorWrapper.java | 131 -------------------
 .../streaming/api/collector/OutputSelector.java |  44 -------
 .../BroadcastOutputSelectorWrapper.java         |  43 ++++++
 .../selector/DirectedOutputSelectorWrapper.java |  95 ++++++++++++++
 .../api/collector/selector/OutputSelector.java  |  44 +++++++
 .../selector/OutputSelectorWrapper.java         |  31 +++++
 .../selector/OutputSelectorWrapperFactory.java  |  32 +++++
 .../streaming/api/datastream/DataStream.java    |   6 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../api/streamvertex/OutputHandler.java         |  39 +++---
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../api/collector/DirectedOutputTest.java       |   1 +
 .../api/collector/OutputSelectorTest.java       |   1 +
 .../examples/iteration/IterateExample.java      |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |   2 +-
 19 files changed, 293 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 5b6de85..f58f0ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
@@ -49,7 +49,8 @@ public class StreamConfig implements Serializable {
 	private static final String VERTEX_NAME = "vertexID";
 	private static final String OPERATOR_NAME = "operatorName";
 	private static final String ITERATION_ID = "iteration-id";
-	private static final String OUTPUT_SELECTOR = "outputSelector";
+//	private static final String OUTPUT_SELECTOR = "outputSelector";
+	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
 	private static final String DIRECTED_EMIT = "directedEmit";
 	private static final String SERIALIZEDUDF = "serializedudf";
 	private static final String USER_FUNCTION = "userfunction";
@@ -67,7 +68,6 @@ public class StreamConfig implements Serializable {
 	private static final String IN_STREAM_EDGES = "out stream edges";
 
 	// DEFAULT VALUES
-
 	private static final long DEFAULT_TIMEOUT = 100;
 	public static final String STATE_MONITORING = "STATE_MONITORING";
 
@@ -189,33 +189,21 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setDirectedEmit(boolean directedEmit) {
-		config.setBoolean(DIRECTED_EMIT, directedEmit);
-	}
-
-	public boolean isDirectedEmit() {
-		return config.getBoolean(DIRECTED_EMIT, false);
-	}
-
-	public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
 		try {
-			if (outputSelector != null && !outputSelector.isEmpty()) {
-				setDirectedEmit(true);
-				config.setBytes(OUTPUT_SELECTOR,
-						SerializationUtils.serialize((Serializable) outputSelector));
-			}
+			config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
 		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize OutputSelector");
+			throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
 		}
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
+	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
 		try {
-			return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config,
-					OUTPUT_SELECTOR, cl);
+			return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					OUTPUT_SELECTOR_WRAPPER, cl);
 		} catch (Exception e) {
-			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
+			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index dfe66a5..948ea5e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -37,7 +37,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -537,8 +539,8 @@ public class StreamGraph extends StreamingPlan {
 		return inputFormatLists.get(vertexID);
 	}
 
-	public List<OutputSelector<?>> getOutputSelector(Integer vertexID) {
-		return outputSelectors.get(vertexID);
+	public OutputSelectorWrapper<?> getOutputSelectorWrapper(Integer vertexID) {
+		return OutputSelectorWrapperFactory.create(outputSelectors.get(vertexID));
 	}
 
 	public Integer getIterationID(Integer vertexID) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 3ff64ca..79d43c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -213,7 +213,7 @@ public class StreamingJobGraphGenerator {
 		config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexID));
 
 		config.setUserInvokable(streamGraph.getInvokable(vertexID));
-		config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
+		config.setOutputSelectorWrapper(streamGraph.getOutputSelectorWrapper(vertexID));
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index 1281bf0..bb0268a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -17,27 +17,26 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import java.util.LinkedList;
-import java.util.List;
-
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.util.Collector;
 
 public class CollectorWrapper<OUT> implements Collector<OUT> {
 
-	private List<Collector<OUT>> outputs;
+	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
 
-	public CollectorWrapper() {
-		this.outputs = new LinkedList<Collector<OUT>>();
+	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
+		this.outputSelectorWrapper = outputSelectorWrapper;
 	}
 
 	@SuppressWarnings("unchecked")
-	public void addCollector(Collector<?> output) {
-		outputs.add((Collector<OUT>) output);
+	public void addCollector(Collector<?> output, StreamEdge edge) {
+		outputSelectorWrapper.addCollector(output, edge);
 	}
 
 	@Override
 	public void collect(OUT record) {
-		for(Collector<OUT> output: outputs){
+		for (Collector<OUT> output : outputSelectorWrapper.getSelectedOutputs(record)) {
 			output.collect(record);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
deleted file mode 100755
index 4681cd3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- * 
- * @param <OUT>
- *            Type of the Tuple collected.
- */
-public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class);
-
-	List<OutputSelector<OUT>> outputSelectors;
-
-	protected Map<String, List<Collector<OUT>>> outputMap;
-
-	private List<Collector<OUT>> selectAllOutputs;
-	private Set<Collector<OUT>> emitted;
-
-	/**
-	 * Creates a new DirectedStreamCollector
-	 * 
-	 * @param outputSelector
-	 *            User defined {@link OutputSelector}
-	 */
-	public DirectedCollectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
-		this.outputSelectors = outputSelectors;
-		this.emitted = new HashSet<Collector<OUT>>();
-		this.selectAllOutputs = new LinkedList<Collector<OUT>>();
-		this.outputMap = new HashMap<String, List<Collector<OUT>>>();
-
-	}
-
-	@Override
-	public void addCollector(Collector<?> output) {
-		addCollector(output, new ArrayList<String>());
-	}
-
-	@SuppressWarnings("unchecked")
-	public void addCollector(Collector<?> output, List<String> selectedNames) {
-
-		if (selectedNames.isEmpty()) {
-			selectAllOutputs.add((Collector<OUT>) output);
-		} else {
-			for (String selectedName : selectedNames) {
-
-				if (!outputMap.containsKey(selectedName)) {
-					outputMap.put(selectedName, new LinkedList<Collector<OUT>>());
-					outputMap.get(selectedName).add((Collector<OUT>) output);
-				} else {
-					if (!outputMap.get(selectedName).contains(output)) {
-						outputMap.get(selectedName).add((Collector<OUT>) output);
-					}
-				}
-
-			}
-		}
-	}
-
-	@Override
-	public void collect(OUT record) {
-		emitted.clear();
-
-		for (Collector<OUT> output : selectAllOutputs) {
-			output.collect(record);
-			emitted.add(output);
-		}
-
-		for (OutputSelector<OUT> outputSelector : outputSelectors) {
-			Iterable<String> outputNames = outputSelector.select(record);
-
-			for (String outputName : outputNames) {
-				List<Collector<OUT>> outputList = outputMap.get(outputName);
-				if (outputList == null) {
-					if (LOG.isErrorEnabled()) {
-						String format = String.format(
-								"Cannot emit because no output is selected with the name: %s",
-								outputName);
-						LOG.error(format);
-
-					}
-				} else {
-					for (Collector<OUT> output : outputList) {
-						if (!emitted.contains(output)) {
-							output.collect(record);
-							emitted.add(output);
-						}
-					}
-
-				}
-
-			}
-		}
-
-	}
-
-	@Override
-	public void close() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
deleted file mode 100644
index 6dbcff4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitDataStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitDataStream} will run through this operator to select outputs.
- * 
- * @param <OUT>
- *            Type parameter of the split values.
- */
-public interface OutputSelector<OUT> extends Serializable {
-	/**
-	 * Method for selecting output names for the emitted objects when using the
-	 * {@link SingleOutputStreamOperator#split} method. The values will be
-	 * emitted only to output names which are contained in the returned
-	 * iterable.
-	 * 
-	 * @param value
-	 *            Output object for which the output selection should be made.
-	 */
-	public Iterable<String> select(OUT value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
new file mode 100644
index 0000000..44371f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+
+public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+	private List<Collector<OUT>> outputs;
+
+	public BroadcastOutputSelectorWrapper() {
+		outputs = new ArrayList<Collector<OUT>>();
+	}
+
+	@Override
+	public void addCollector(Collector<?> output, StreamEdge edge) {
+		outputs.add((Collector<OUT>) output);
+	}
+
+	@Override
+	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
+		return outputs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
new file mode 100644
index 0000000..624fac1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
+
+	private List<OutputSelector<OUT>> outputSelectors;
+
+	private Map<String, List<Collector<OUT>>> outputMap;
+	private Set<Collector<OUT>> selectAllOutputs;
+//	private Set<Collector<OUT>> emitted;
+
+	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
+		this.outputSelectors = outputSelectors;
+//		this.emitted = new HashSet<Collector<OUT>>();
+		this.selectAllOutputs = new HashSet<Collector<OUT>>(); //new LinkedList<Collector<OUT>>();
+		this.outputMap = new HashMap<String, List<Collector<OUT>>>();
+	}
+
+	@Override
+	public void addCollector(Collector<?> output, StreamEdge edge) {
+		List<String> selectedNames = edge.getSelectedNames();
+
+		if (selectedNames.isEmpty()) {
+			selectAllOutputs.add((Collector<OUT>) output);
+		} else {
+			for (String selectedName : selectedNames) {
+
+				if (!outputMap.containsKey(selectedName)) {
+					outputMap.put(selectedName, new LinkedList<Collector<OUT>>());
+					outputMap.get(selectedName).add((Collector<OUT>) output);
+				} else {
+					if (!outputMap.get(selectedName).contains(output)) {
+						outputMap.get(selectedName).add((Collector<OUT>) output);
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
+		Set<Collector<OUT>> selectedOutputs = new HashSet<Collector<OUT>>(selectAllOutputs);
+
+		for (OutputSelector<OUT> outputSelector : outputSelectors) {
+			Iterable<String> outputNames = outputSelector.select(record);
+
+			for (String outputName : outputNames) {
+				List<Collector<OUT>> outputList = outputMap.get(outputName);
+
+				try {
+					selectedOutputs.addAll(outputList);
+				} catch (NullPointerException e) {
+					if (LOG.isErrorEnabled()) {
+						String format = String.format(
+								"Cannot emit because no output is selected with the name: %s",
+								outputName);
+						LOG.error(format);
+					}
+				}
+			}
+		}
+
+		return selectedOutputs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
new file mode 100644
index 0000000..b886fa6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
+/**
+ * Interface for defining an OutputSelector for a {@link SplitDataStream} using
+ * the {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * {@link SplitDataStream} will run through this operator to select outputs.
+ * 
+ * @param <OUT>
+ *            Type parameter of the split values.
+ */
+public interface OutputSelector<OUT> extends Serializable {
+	/**
+	 * Method for selecting output names for the emitted objects when using the
+	 * {@link SingleOutputStreamOperator#split} method. The values will be
+	 * emitted only to output names which are contained in the returned
+	 * iterable.
+	 * 
+	 * @param value
+	 *            Output object for which the output selection should be made.
+	 */
+	public Iterable<String> select(OUT value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
new file mode 100644
index 0000000..850a1d9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+
+public interface OutputSelectorWrapper<OUT> extends Serializable {
+
+	public void addCollector(Collector<?> output, StreamEdge edge);
+
+	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
new file mode 100644
index 0000000..c0f22c7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.List;
+
+public class OutputSelectorWrapperFactory {
+
+	public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
+		if (outputSelectors.size() == 0) {
+			return new BroadcastOutputSelectorWrapper();
+		} else {
+			return new DirectedOutputSelectorWrapper(outputSelectors);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 879a98c..5f6f981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,7 +47,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.StreamGraph;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -238,12 +238,12 @@ public class DataStream<OUT> {
 
 	/**
 	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link org.apache.flink.streaming.api.collector.OutputSelector}. Calling
+	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling
 	 * this method on an operator creates a new {@link SplitDataStream}.
 	 * 
 	 * @param outputSelector
 	 *            The user defined
-	 *            {@link org.apache.flink.streaming.api.collector.OutputSelector}
+	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
 	 *            for directing the tuples.
 	 * @return The {@link SplitDataStream}
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 97458a8..69e059e 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import java.util.Arrays;
 
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 
 /**
  * The SplitDataStream represents an operator that has been split using an

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index ca6b34d..18ddc79 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.StreamEdge;
 import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -100,22 +100,22 @@ public class OutputHandler<OUT> {
 	 * This method builds up a nested collector which encapsulates all the
 	 * chained operators and their network output. The result of this recursive
 	 * call will be passed as collector to the first invokable in the chain.
-	 * 
+	 *
 	 * @param chainedTaskConfig
-	 *            The configuration of the starting operator of the chain, we
-	 *            use this paramater to recursively build the whole chain
+	 * 		The configuration of the starting operator of the chain, we
+	 * 		use this paramater to recursively build the whole chain
 	 * @return Returns the collector for the chain starting from the given
-	 *         config
+	 * config
 	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
 
-		boolean isDirectEmit = chainedTaskConfig.isDirectedEmit();
 
 		// We create a wrapper that will encapsulate the chained operators and
 		// network outputs
-		CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper(
-				chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
+
+		OutputSelectorWrapper<OUT> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+		CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(outputSelectorWrapper);
 
 		// Create collectors for the network outputs
 		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
@@ -123,12 +123,7 @@ public class OutputHandler<OUT> {
 
 			Collector<?> outCollector = outputMap.get(output);
 
-			if (isDirectEmit) {
-				((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
-						chainedTaskConfig.getSelectedNames(output));
-			} else {
-				wrapper.addCollector(outCollector);
-			}
+			wrapper.addCollector(outCollector, outputEdge);
 		}
 
 		// Create collectors for the chained outputs
@@ -136,12 +131,8 @@ public class OutputHandler<OUT> {
 			Integer output = outputEdge.getTargetVertex();
 
 			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
-			if (isDirectEmit) {
-				((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
-						chainedTaskConfig.getSelectedNames(output));
-			} else {
-				wrapper.addCollector(outCollector);
-			}
+
+			wrapper.addCollector(outCollector, outputEdge);
 		}
 
 		if (chainedTaskConfig.isChainStart()) {
@@ -169,11 +160,11 @@ public class OutputHandler<OUT> {
 	/**
 	 * We create the StreamOutput for the specific output given by the id, and
 	 * the configuration of its source task
-	 * 
+	 *
 	 * @param outputVertex
-	 *            Name of the output to which the streamoutput will be set up
+	 * 		Name of the output to which the streamoutput will be set up
 	 * @param configuration
-	 *            The config of upStream task
+	 * 		The config of upStream task
 	 * @return
 	 */
 	private <T> StreamOutput<T> createStreamOutput(Integer outputVertex,

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index cf6bb3c..14f0fa0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestListResultSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index ffc7c74..13bf457 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
index 1615a45..a3d89f2 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.junit.Test;
 
 public class OutputSelectorTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 998e818..bbd5433 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 59b1906..3dc54d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
 import scala.collection.JavaConverters._
@@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
-import org.apache.flink.streaming.api.collector.OutputSelector
 import scala.collection.JavaConversions._
 import java.util.HashMap
 import org.apache.flink.streaming.api.function.aggregation.SumFunction