You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:37 UTC
[05/10] flink git commit: [FLINK-1594] [streaming] Added
OutputSelector wrapping
[FLINK-1594] [streaming] Added OutputSelector wrapping
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d832400b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d832400b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d832400b
Branch: refs/heads/master
Commit: d832400b3fee6c73553492e5954587958b4f7137
Parents: 3158d1d
Author: Gábor Hermann <re...@gmail.com>
Authored: Mon Mar 2 10:20:14 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/StreamConfig.java | 32 ++---
.../apache/flink/streaming/api/StreamGraph.java | 8 +-
.../api/StreamingJobGraphGenerator.java | 2 +-
.../api/collector/CollectorWrapper.java | 17 ++-
.../api/collector/DirectedCollectorWrapper.java | 131 -------------------
.../streaming/api/collector/OutputSelector.java | 44 -------
.../BroadcastOutputSelectorWrapper.java | 43 ++++++
.../selector/DirectedOutputSelectorWrapper.java | 95 ++++++++++++++
.../api/collector/selector/OutputSelector.java | 44 +++++++
.../selector/OutputSelectorWrapper.java | 31 +++++
.../selector/OutputSelectorWrapperFactory.java | 32 +++++
.../streaming/api/datastream/DataStream.java | 6 +-
.../api/datastream/SplitDataStream.java | 2 +-
.../api/streamvertex/OutputHandler.java | 39 +++---
.../flink/streaming/api/OutputSplitterTest.java | 2 +-
.../api/collector/DirectedOutputTest.java | 1 +
.../api/collector/OutputSelectorTest.java | 1 +
.../examples/iteration/IterateExample.java | 2 +-
.../flink/streaming/api/scala/DataStream.scala | 2 +-
19 files changed, 293 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 5b6de85..f58f0ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
@@ -49,7 +49,8 @@ public class StreamConfig implements Serializable {
private static final String VERTEX_NAME = "vertexID";
private static final String OPERATOR_NAME = "operatorName";
private static final String ITERATION_ID = "iteration-id";
- private static final String OUTPUT_SELECTOR = "outputSelector";
+// private static final String OUTPUT_SELECTOR = "outputSelector";
+ private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
private static final String DIRECTED_EMIT = "directedEmit";
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
@@ -67,7 +68,6 @@ public class StreamConfig implements Serializable {
private static final String IN_STREAM_EDGES = "out stream edges";
// DEFAULT VALUES
-
private static final long DEFAULT_TIMEOUT = 100;
public static final String STATE_MONITORING = "STATE_MONITORING";
@@ -189,33 +189,21 @@ public class StreamConfig implements Serializable {
}
}
- public void setDirectedEmit(boolean directedEmit) {
- config.setBoolean(DIRECTED_EMIT, directedEmit);
- }
-
- public boolean isDirectedEmit() {
- return config.getBoolean(DIRECTED_EMIT, false);
- }
-
- public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+ public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
try {
- if (outputSelector != null && !outputSelector.isEmpty()) {
- setDirectedEmit(true);
- config.setBytes(OUTPUT_SELECTOR,
- SerializationUtils.serialize((Serializable) outputSelector));
- }
+ config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
} catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize OutputSelector");
+ throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
}
}
@SuppressWarnings("unchecked")
- public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
+ public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
try {
- return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config,
- OUTPUT_SELECTOR, cl);
+ return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ OUTPUT_SELECTOR_WRAPPER, cl);
} catch (Exception e) {
- throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
+ throw new StreamVertexException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index dfe66a5..948ea5e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -37,7 +37,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -537,8 +539,8 @@ public class StreamGraph extends StreamingPlan {
return inputFormatLists.get(vertexID);
}
- public List<OutputSelector<?>> getOutputSelector(Integer vertexID) {
- return outputSelectors.get(vertexID);
+ public OutputSelectorWrapper<?> getOutputSelectorWrapper(Integer vertexID) {
+ return OutputSelectorWrapperFactory.create(outputSelectors.get(vertexID));
}
public Integer getIterationID(Integer vertexID) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 3ff64ca..79d43c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -213,7 +213,7 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexID));
config.setUserInvokable(streamGraph.getInvokable(vertexID));
- config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
+ config.setOutputSelectorWrapper(streamGraph.getOutputSelectorWrapper(vertexID));
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index 1281bf0..bb0268a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -17,27 +17,26 @@
package org.apache.flink.streaming.api.collector;
-import java.util.LinkedList;
-import java.util.List;
-
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.util.Collector;
public class CollectorWrapper<OUT> implements Collector<OUT> {
- private List<Collector<OUT>> outputs;
+ private OutputSelectorWrapper<OUT> outputSelectorWrapper;
- public CollectorWrapper() {
- this.outputs = new LinkedList<Collector<OUT>>();
+ public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
+ this.outputSelectorWrapper = outputSelectorWrapper;
}
@SuppressWarnings("unchecked")
- public void addCollector(Collector<?> output) {
- outputs.add((Collector<OUT>) output);
+ public void addCollector(Collector<?> output, StreamEdge edge) {
+ outputSelectorWrapper.addCollector(output, edge);
}
@Override
public void collect(OUT record) {
- for(Collector<OUT> output: outputs){
+ for (Collector<OUT> output : outputSelectorWrapper.getSelectedOutputs(record)) {
output.collect(record);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
deleted file mode 100755
index 4681cd3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- *
- * @param <OUT>
- * Type of the Tuple collected.
- */
-public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class);
-
- List<OutputSelector<OUT>> outputSelectors;
-
- protected Map<String, List<Collector<OUT>>> outputMap;
-
- private List<Collector<OUT>> selectAllOutputs;
- private Set<Collector<OUT>> emitted;
-
- /**
- * Creates a new DirectedStreamCollector
- *
- * @param outputSelector
- * User defined {@link OutputSelector}
- */
- public DirectedCollectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
- this.outputSelectors = outputSelectors;
- this.emitted = new HashSet<Collector<OUT>>();
- this.selectAllOutputs = new LinkedList<Collector<OUT>>();
- this.outputMap = new HashMap<String, List<Collector<OUT>>>();
-
- }
-
- @Override
- public void addCollector(Collector<?> output) {
- addCollector(output, new ArrayList<String>());
- }
-
- @SuppressWarnings("unchecked")
- public void addCollector(Collector<?> output, List<String> selectedNames) {
-
- if (selectedNames.isEmpty()) {
- selectAllOutputs.add((Collector<OUT>) output);
- } else {
- for (String selectedName : selectedNames) {
-
- if (!outputMap.containsKey(selectedName)) {
- outputMap.put(selectedName, new LinkedList<Collector<OUT>>());
- outputMap.get(selectedName).add((Collector<OUT>) output);
- } else {
- if (!outputMap.get(selectedName).contains(output)) {
- outputMap.get(selectedName).add((Collector<OUT>) output);
- }
- }
-
- }
- }
- }
-
- @Override
- public void collect(OUT record) {
- emitted.clear();
-
- for (Collector<OUT> output : selectAllOutputs) {
- output.collect(record);
- emitted.add(output);
- }
-
- for (OutputSelector<OUT> outputSelector : outputSelectors) {
- Iterable<String> outputNames = outputSelector.select(record);
-
- for (String outputName : outputNames) {
- List<Collector<OUT>> outputList = outputMap.get(outputName);
- if (outputList == null) {
- if (LOG.isErrorEnabled()) {
- String format = String.format(
- "Cannot emit because no output is selected with the name: %s",
- outputName);
- LOG.error(format);
-
- }
- } else {
- for (Collector<OUT> output : outputList) {
- if (!emitted.contains(output)) {
- output.collect(record);
- emitted.add(output);
- }
- }
-
- }
-
- }
- }
-
- }
-
- @Override
- public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
deleted file mode 100644
index 6dbcff4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitDataStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitDataStream} will run through this operator to select outputs.
- *
- * @param <OUT>
- * Type parameter of the split values.
- */
-public interface OutputSelector<OUT> extends Serializable {
- /**
- * Method for selecting output names for the emitted objects when using the
- * {@link SingleOutputStreamOperator#split} method. The values will be
- * emitted only to output names which are contained in the returned
- * iterable.
- *
- * @param value
- * Output object for which the output selection should be made.
- */
- public Iterable<String> select(OUT value);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
new file mode 100644
index 0000000..44371f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+
+public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+ private List<Collector<OUT>> outputs;
+
+ public BroadcastOutputSelectorWrapper() {
+ outputs = new ArrayList<Collector<OUT>>();
+ }
+
+ @Override
+ public void addCollector(Collector<?> output, StreamEdge edge) {
+ outputs.add((Collector<OUT>) output);
+ }
+
+ @Override
+ public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
+ return outputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
new file mode 100644
index 0000000..624fac1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
+
+ private List<OutputSelector<OUT>> outputSelectors;
+
+ private Map<String, List<Collector<OUT>>> outputMap;
+ private Set<Collector<OUT>> selectAllOutputs;
+// private Set<Collector<OUT>> emitted;
+
+ public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
+ this.outputSelectors = outputSelectors;
+// this.emitted = new HashSet<Collector<OUT>>();
+ this.selectAllOutputs = new HashSet<Collector<OUT>>(); //new LinkedList<Collector<OUT>>();
+ this.outputMap = new HashMap<String, List<Collector<OUT>>>();
+ }
+
+ @Override
+ public void addCollector(Collector<?> output, StreamEdge edge) {
+ List<String> selectedNames = edge.getSelectedNames();
+
+ if (selectedNames.isEmpty()) {
+ selectAllOutputs.add((Collector<OUT>) output);
+ } else {
+ for (String selectedName : selectedNames) {
+
+ if (!outputMap.containsKey(selectedName)) {
+ outputMap.put(selectedName, new LinkedList<Collector<OUT>>());
+ outputMap.get(selectedName).add((Collector<OUT>) output);
+ } else {
+ if (!outputMap.get(selectedName).contains(output)) {
+ outputMap.get(selectedName).add((Collector<OUT>) output);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
+ Set<Collector<OUT>> selectedOutputs = new HashSet<Collector<OUT>>(selectAllOutputs);
+
+ for (OutputSelector<OUT> outputSelector : outputSelectors) {
+ Iterable<String> outputNames = outputSelector.select(record);
+
+ for (String outputName : outputNames) {
+ List<Collector<OUT>> outputList = outputMap.get(outputName);
+
+ try {
+ selectedOutputs.addAll(outputList);
+ } catch (NullPointerException e) {
+ if (LOG.isErrorEnabled()) {
+ String format = String.format(
+ "Cannot emit because no output is selected with the name: %s",
+ outputName);
+ LOG.error(format);
+ }
+ }
+ }
+ }
+
+ return selectedOutputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
new file mode 100644
index 0000000..b886fa6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
+/**
+ * Interface for defining an OutputSelector for a {@link SplitDataStream} using
+ * the {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * {@link SplitDataStream} will run through this operator to select outputs.
+ *
+ * @param <OUT>
+ * Type parameter of the split values.
+ */
+public interface OutputSelector<OUT> extends Serializable {
+ /**
+ * Method for selecting output names for the emitted objects when using the
+ * {@link SingleOutputStreamOperator#split} method. The values will be
+ * emitted only to output names which are contained in the returned
+ * iterable.
+ *
+ * @param value
+ * Output object for which the output selection should be made.
+ */
+ public Iterable<String> select(OUT value);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
new file mode 100644
index 0000000..850a1d9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.util.Collector;
+
+public interface OutputSelectorWrapper<OUT> extends Serializable {
+
+ public void addCollector(Collector<?> output, StreamEdge edge);
+
+ public Iterable<Collector<OUT>> getSelectedOutputs(OUT record);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
new file mode 100644
index 0000000..c0f22c7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.List;
+
+public class OutputSelectorWrapperFactory {
+
+ public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
+ if (outputSelectors.size() == 0) {
+ return new BroadcastOutputSelectorWrapper();
+ } else {
+ return new DirectedOutputSelectorWrapper(outputSelectors);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 879a98c..5f6f981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,7 +47,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.StreamGraph;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -238,12 +238,12 @@ public class DataStream<OUT> {
/**
* Operator used for directing tuples to specific named outputs using an
- * {@link org.apache.flink.streaming.api.collector.OutputSelector}. Calling
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling
* this method on an operator creates a new {@link SplitDataStream}.
*
* @param outputSelector
* The user defined
- * {@link org.apache.flink.streaming.api.collector.OutputSelector}
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
* for directing the tuples.
* @return The {@link SplitDataStream}
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 97458a8..69e059e 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.datastream;
import java.util.Arrays;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
/**
* The SplitDataStream represents an operator that has been split using an
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index ca6b34d..18ddc79 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.StreamEdge;
import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -100,22 +100,22 @@ public class OutputHandler<OUT> {
* This method builds up a nested collector which encapsulates all the
* chained operators and their network output. The result of this recursive
* call will be passed as collector to the first invokable in the chain.
- *
+ *
* @param chainedTaskConfig
- * The configuration of the starting operator of the chain, we
- * use this paramater to recursively build the whole chain
+ * The configuration of the starting operator of the chain, we
+ * use this paramater to recursively build the whole chain
* @return Returns the collector for the chain starting from the given
- * config
+ * config
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
- boolean isDirectEmit = chainedTaskConfig.isDirectedEmit();
// We create a wrapper that will encapsulate the chained operators and
// network outputs
- CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper(
- chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
+
+ OutputSelectorWrapper<OUT> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+ CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(outputSelectorWrapper);
// Create collectors for the network outputs
for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
@@ -123,12 +123,7 @@ public class OutputHandler<OUT> {
Collector<?> outCollector = outputMap.get(output);
- if (isDirectEmit) {
- ((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
- chainedTaskConfig.getSelectedNames(output));
- } else {
- wrapper.addCollector(outCollector);
- }
+ wrapper.addCollector(outCollector, outputEdge);
}
// Create collectors for the chained outputs
@@ -136,12 +131,8 @@ public class OutputHandler<OUT> {
Integer output = outputEdge.getTargetVertex();
Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
- if (isDirectEmit) {
- ((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
- chainedTaskConfig.getSelectedNames(output));
- } else {
- wrapper.addCollector(outCollector);
- }
+
+ wrapper.addCollector(outCollector, outputEdge);
}
if (chainedTaskConfig.isChainStart()) {
@@ -169,11 +160,11 @@ public class OutputHandler<OUT> {
/**
* We create the StreamOutput for the specific output given by the id, and
* the configuration of its source task
- *
+ *
* @param outputVertex
- * Name of the output to which the streamoutput will be set up
+ * Name of the output to which the streamoutput will be set up
* @param configuration
- * The config of upStream task
+ * The config of upStream task
* @return
*/
private <T> StreamOutput<T> createStreamOutput(Integer outputVertex,
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index cf6bb3c..14f0fa0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestListResultSink;
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index ffc7c74..13bf457 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
index 1615a45..a3d89f2 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.junit.Test;
public class OutputSelectorTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 998e818..bbd5433 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 59b1906..3dc54d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
SingleOutputStreamOperator, GroupedDataStream}
import scala.collection.JavaConverters._
@@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
-import org.apache.flink.streaming.api.collector.OutputSelector
import scala.collection.JavaConversions._
import java.util.HashMap
import org.apache.flink.streaming.api.function.aggregation.SumFunction