You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:46 UTC
[03/18] git commit: [streaming] Updated directed emit to not use
output names when all outputs are selected
[streaming] Updated directed emit to not use output names when all outputs are selected
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fbfcc9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fbfcc9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fbfcc9eb
Branch: refs/heads/master
Commit: fbfcc9eb4d835df1a646b6f963527e86ef380ec9
Parents: 47dca69
Author: ghermann <re...@gmail.com>
Authored: Tue Sep 2 14:47:48 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 9 +-
.../flink/streaming/api/StreamConfig.java | 56 ++--
.../api/collector/DirectedStreamCollector.java | 224 ++++++++-------
.../api/collector/StreamCollector.java | 285 ++++++++++---------
.../streaming/api/datastream/DataStream.java | 7 +-
.../api/datastream/IterativeDataStream.java | 4 +-
.../datastream/SingleOutputStreamOperator.java | 18 +-
.../api/datastream/SplitDataStream.java | 20 +-
.../api/streamcomponent/OutputHandler.java | 5 +-
.../api/collector/DirectedOutputTest.java | 120 ++++----
.../api/collector/StreamCollectorTest.java | 2 +-
11 files changed, 378 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index aaa7161..7973324 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -64,6 +64,7 @@ public class JobGraphBuilder {
private Map<String, List<String>> outEdgeList;
private Map<String, List<Integer>> outEdgeType;
private Map<String, List<List<String>>> outEdgeNames;
+ private Map<String, List<Boolean>> outEdgeSelectAll;
private Map<String, Boolean> mutability;
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<?>>> connectionTypes;
@@ -106,6 +107,7 @@ public class JobGraphBuilder {
outEdgeList = new HashMap<String, List<String>>();
outEdgeType = new HashMap<String, List<Integer>>();
outEdgeNames = new HashMap<String, List<List<String>>>();
+ outEdgeSelectAll = new HashMap<String, List<Boolean>>();
mutability = new HashMap<String, Boolean>();
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
@@ -203,7 +205,7 @@ public class JobGraphBuilder {
setEdge(componentName, iterationHead,
connectionTypes.get(inEdgeList.get(iterationHead).get(0))
- .get(0), 0, new ArrayList<String>());
+ .get(0), 0, new ArrayList<String>(), false);
iterationWaitTime.put(iterationIDtoSourceName.get(iterationID),
waitTime);
@@ -360,6 +362,7 @@ public class JobGraphBuilder {
outEdgeList.put(componentName, new ArrayList<String>());
outEdgeType.put(componentName, new ArrayList<Integer>());
outEdgeNames.put(componentName, new ArrayList<List<String>>());
+ outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
inEdgeList.put(componentName, new ArrayList<String>());
connectionTypes.put(componentName,
new ArrayList<StreamPartitioner<?>>());
@@ -484,12 +487,13 @@ public class JobGraphBuilder {
public void setEdge(String upStreamComponentName,
String downStreamComponentName,
StreamPartitioner<?> partitionerObject, int typeNumber,
- List<String> outputNames) {
+ List<String> outputNames, boolean selectAll) {
outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
outEdgeType.get(upStreamComponentName).add(typeNumber);
inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
connectionTypes.get(upStreamComponentName).add(partitionerObject);
outEdgeNames.get(upStreamComponentName).add(outputNames);
+ outEdgeSelectAll.get(upStreamComponentName).add(selectAll);
}
/**
@@ -541,6 +545,7 @@ public class JobGraphBuilder {
config.setOutputName(outputIndex,
outEdgeNames.get(upStreamComponentName).get(outputIndex));
+ config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
config.setPartitioner(outputIndex, partitionerObject);
config.setNumberOfOutputChannels(outputIndex,
componentParallelism.get(downStreamComponentName));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 445020a..c2a4c21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -38,6 +38,7 @@ public class StreamConfig {
private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
private static final String NUMBER_OF_INPUTS = "numberOfInputs";
private static final String OUTPUT_NAME = "outputName_";
+ private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
private static final String PARTITIONER_OBJECT = "partitionerObject_";
private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
private static final String ITERATION_ID = "iteration-id";
@@ -149,8 +150,7 @@ public class StreamConfig {
config.setClass(USER_FUNCTION, invokableObject.getClass());
try {
- config.setBytes(SERIALIZEDUDF,
- SerializationUtils.serialize(invokableObject));
+ config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize invokable object "
+ invokableObject.getClass(), e);
@@ -162,11 +162,10 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
- throw new StreamComponentException(
- "Cannot instantiate user function", e);
+ throw new StreamComponentException("Cannot instantiate user function", e);
}
}
-
+
public void setComponentName(String componentName) {
config.setString(COMPONENT_NAME, componentName);
}
@@ -184,8 +183,7 @@ public class StreamConfig {
public Object getFunction() {
try {
- return SerializationUtils.deserialize(config.getBytes(FUNCTION,
- null));
+ return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
} catch (SerializationException e) {
throw new RuntimeException("Cannot deserialize invokable object", e);
}
@@ -214,8 +212,8 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
} catch (Exception e) {
- throw new StreamComponentException(
- "Cannot deserialize and instantiate OutputSelector", e);
+ throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+ e);
}
}
@@ -235,30 +233,36 @@ public class StreamConfig {
return config.getLong(ITERATON_WAIT, 0);
}
- public void setNumberOfOutputChannels(int outputIndex,
- Integer numberOfOutputChannels) {
- config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex,
- numberOfOutputChannels);
+ public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
+ config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
}
public int getNumberOfOutputChannels(int outputIndex) {
return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
}
- public <T> void setPartitioner(int outputIndex,
- StreamPartitioner<T> partitionerObject) {
+ public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> partitionerObject) {
config.setBytes(PARTITIONER_OBJECT + outputIndex,
SerializationUtils.serialize(partitionerObject));
}
- public <T> StreamPartitioner<T> getPartitioner(int outputIndex)
- throws ClassNotFoundException, IOException {
- return deserializeObject(config.getBytes(PARTITIONER_OBJECT
- + outputIndex,
+ public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
+ IOException {
+ return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
SerializationUtils.serialize(new ShufflePartitioner<T>())));
}
+ public void setSelectAll(int outputIndex, Boolean selectAll) {
+ if (selectAll != null) {
+ config.setBoolean(OUTPUT_SELECT_ALL + outputIndex, selectAll);
+ }
+ }
+
+ public boolean getSelectAll(int outputIndex) {
+ return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, false);
+ }
+
public void setOutputName(int outputIndex, List<String> outputName) {
if (outputName != null) {
config.setBytes(OUTPUT_NAME + outputIndex,
@@ -268,8 +272,8 @@ public class StreamConfig {
@SuppressWarnings("unchecked")
public List<String> getOutputName(int outputIndex) {
- return (List<String>) SerializationUtils.deserialize(config.getBytes(
- OUTPUT_NAME + outputIndex, null));
+ return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
+ + outputIndex, null));
}
public void setNumberOfInputs(int numberOfInputs) {
@@ -296,20 +300,18 @@ public class StreamConfig {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}
- public void setFunctionClass(
- Class<? extends AbstractRichFunction> functionClass) {
+ public void setFunctionClass(Class<? extends AbstractRichFunction> functionClass) {
config.setClass("functionClass", functionClass);
}
@SuppressWarnings("unchecked")
public Class<? extends AbstractRichFunction> getFunctionClass() {
- return (Class<? extends AbstractRichFunction>) config.getClass(
- "functionClass", null);
+ return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
}
@SuppressWarnings("unchecked")
- protected static <T> T deserializeObject(byte[] serializedObject)
- throws IOException, ClassNotFoundException {
+ protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+ ClassNotFoundException {
return (T) SerializationUtils.deserialize(serializedObject);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 54b1a98..ab6caea 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -1,105 +1,119 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- *
- * @param <OUT>
- * Type of the Tuple collected.
- */
-public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
-
- OutputSelector<OUT> outputSelector;
- private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
- private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
-
- /**
- * Creates a new DirectedStreamCollector
- *
- * @param channelID
- * Channel ID of the Task
- * @param serializationDelegate
- * Serialization delegate used for serialization
- * @param outputSelector
- * User defined {@link OutputSelector}
- */
- public DirectedStreamCollector(int channelID,
- SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
- OutputSelector<OUT> outputSelector) {
- super(channelID, serializationDelegate);
- this.outputSelector = outputSelector;
- this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-
- }
-
- /**
- * Collects and emits a tuple to the outputs by reusing a StreamRecord
- * object.
- *
- * @param outputObject
- * Object to be collected and emitted.
- */
- @Override
- public void collect(OUT outputObject) {
- streamRecord.setObject(outputObject);
- emit(streamRecord);
- }
-
- /**
- * Emits a StreamRecord to the outputs selected by the user defined
- * OutputSelector
- *
- * @param streamRecord
- * Record to emit.
- */
- private void emit(StreamRecord<OUT> streamRecord) {
- Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
- streamRecord.newId(channelID);
- serializationDelegate.setInstance(streamRecord);
- emitted.clear();
- for (String outputName : outputNames) {
- try {
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputMap
- .get(outputName)) {
- if (!emitted.contains(output)) {
- output.emit(serializationDelegate);
- emitted.add(output);
- }
- }
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Emit to %s failed due to: %s", outputName,
- StringUtils.stringifyException(e)));
- }
- }
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StreamCollector that uses user defined output names and a user defined
+ * output selector to make directed emits.
+ *
+ * @param <OUT>
+ * Type of the Tuple collected.
+ */
+public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
+
+ OutputSelector<OUT> outputSelector;
+ private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs;
+ private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
+
+ /**
+ * Creates a new DirectedStreamCollector
+ *
+ * @param channelID
+ * Channel ID of the Task
+ * @param serializationDelegate
+ * Serialization delegate used for serialization
+ * @param outputSelector
+ * User defined {@link OutputSelector}
+ */
+ public DirectedStreamCollector(int channelID,
+ SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
+ OutputSelector<OUT> outputSelector) {
+ super(channelID, serializationDelegate);
+ this.outputSelector = outputSelector;
+ this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ this.selectAllOutputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ }
+
+ @Override
+ public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+ List<String> outputNames, boolean isSelectAllOutput) {
+
+ if (isSelectAllOutput) {
+ selectAllOutputs.add(output);
+ } else {
+ addOneOutput(output, outputNames, isSelectAllOutput);
+ }
+ }
+
+ /**
+ * Emits a StreamRecord to the outputs selected by the user defined
+ * OutputSelector
+ *
+ */
+ protected void emitToOutputs() {
+ Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
+ emitted.clear();
+ for (String outputName : outputNames) {
+ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap
+ .get(outputName);
+ if (outputList == null) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format(
+ "Cannot emit because no output is selected with the name: %s",
+ outputName));
+ }
+ }
+
+ try {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
+ if (!emitted.contains(output)) {
+ output.emit(serializationDelegate);
+ emitted.add(output);
+ }
+ }
+
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) {
+ if (!emitted.contains(output)) {
+ output.emit(serializationDelegate);
+ emitted.add(output);
+ }
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Emit to %s failed due to: %s", outputName,
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index c6ba1ef..ce4069e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -1,137 +1,148 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Collector for tuples in Apache Flink stream processing. The collected values
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
- *
- * @param <OUT>
- * Type of the Tuples/Objects collected.
- */
-public class StreamCollector<OUT> implements Collector<OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
-
- protected StreamRecord<OUT> streamRecord;
- protected int channelID;
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
- protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
- protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
-
- /**
- * Creates a new StreamCollector
- *
- * @param channelID
- * Channel ID of the Task
- * @param serializationDelegate
- * Serialization delegate used for serialization
- */
- public StreamCollector(int channelID,
- SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
- this.serializationDelegate = serializationDelegate;
- if (serializationDelegate != null) {
- this.streamRecord = serializationDelegate.getInstance();
- } else {
- this.streamRecord = new StreamRecord<OUT>();
- }
- this.channelID = channelID;
- this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
- this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
- }
-
- /**
- * Adds an output with the given user defined name
- *
- * @param output
- * The RecordWriter object representing the output.
- * @param outputNames
- * User defined names of the output.
- */
- public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
- List<String> outputNames) {
- outputs.add(output);
- for (String outputName : outputNames) {
- if (outputName != null) {
- if (!outputMap.containsKey(outputName)) {
- outputMap
- .put(outputName,
- new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
- outputMap.get(outputName).add(output);
- } else {
- if (!outputMap.get(outputName).contains(output)) {
- outputMap.get(outputName).add(output);
- }
- }
-
- }
- }
- }
-
- /**
- * Collects and emits a tuple/object to the outputs by reusing a
- * StreamRecord object.
- *
- * @param outputObject
- * Object to be collected and emitted.
- */
- @Override
- public void collect(OUT outputObject) {
- streamRecord.setObject(outputObject);
- emit(streamRecord);
- }
-
- /**
- * Emits a StreamRecord to all the outputs.
- *
- * @param streamRecord
- * StreamRecord to emit.
- */
- private void emit(StreamRecord<OUT> streamRecord) {
- streamRecord.newId(channelID);
- serializationDelegate.setInstance(streamRecord);
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- try {
- output.emit(serializationDelegate);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Emit failed due to: %s",
- StringUtils.stringifyException(e)));
- }
- }
- }
- }
-
- @Override
- public void close() {
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collector for tuples in Apache Flink stream processing. The collected values
+ * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
+ * outputs.
+ *
+ * @param <OUT>
+ * Type of the Tuples/Objects collected.
+ */
+public class StreamCollector<OUT> implements Collector<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
+
+ protected StreamRecord<OUT> streamRecord;
+ protected int channelID;
+ protected List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+ protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
+ protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+
+ /**
+ * Creates a new StreamCollector
+ *
+ * @param channelID
+ * Channel ID of the Task
+ * @param serializationDelegate
+ * Serialization delegate used for serialization
+ */
+ public StreamCollector(int channelID,
+ SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
+ this.serializationDelegate = serializationDelegate;
+ if (serializationDelegate != null) {
+ this.streamRecord = serializationDelegate.getInstance();
+ } else {
+ this.streamRecord = new StreamRecord<OUT>();
+ }
+ this.channelID = channelID;
+ this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
+ }
+
+ /**
+ * Adds an output with the given user defined name
+ *
+ * @param output
+ * The RecordWriter object representing the output.
+ * @param outputNames
+ * User defined names of the output.
+ * @param isSelectAllOutput
+ * Marks whether all the outputs are selected.
+ */
+ public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+ List<String> outputNames, boolean isSelectAllOutput) {
+ addOneOutput(output, outputNames, isSelectAllOutput);
+ }
+
+ protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+ List<String> outputNames, boolean isSelectAllOutput) {
+ outputs.add(output);
+ for (String outputName : outputNames) {
+ if (outputName != null) {
+ if (!outputMap.containsKey(outputName)) {
+ outputMap
+ .put(outputName,
+ new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
+ outputMap.get(outputName).add(output);
+ } else {
+ if (!outputMap.get(outputName).contains(output)) {
+ outputMap.get(outputName).add(output);
+ }
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Collects and emits a tuple/object to the outputs by reusing a
+ * StreamRecord object.
+ *
+ * @param outputObject
+ * Object to be collected and emitted.
+ */
+ @Override
+ public void collect(OUT outputObject) {
+ streamRecord.setObject(outputObject);
+ emit(streamRecord);
+ }
+
+ /**
+ * Emits a StreamRecord to the outputs.
+ *
+ * @param streamRecord
+ * StreamRecord to emit.
+ */
+ private void emit(StreamRecord<OUT> streamRecord) {
+ streamRecord.newId(channelID);
+ serializationDelegate.setInstance(streamRecord);
+ emitToOutputs();
+ }
+
+ protected void emitToOutputs() {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ try {
+ output.emit(serializationDelegate);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Emit failed due to: %s",
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index a2994dc..64a07b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -82,6 +82,7 @@ public abstract class DataStream<OUT> {
protected final String id;
protected int degreeOfParallelism;
protected List<String> userDefinedNames;
+ protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
protected final JobGraphBuilder jobGraphBuilder;
@@ -106,6 +107,7 @@ public abstract class DataStream<OUT> {
this.degreeOfParallelism = environment.getDegreeOfParallelism();
this.jobGraphBuilder = environment.getJobGraphBuilder();
this.userDefinedNames = new ArrayList<String>();
+ this.selectAll = false;
this.partitioner = new ForwardPartitioner<OUT>();
}
@@ -121,6 +123,7 @@ public abstract class DataStream<OUT> {
this.id = dataStream.id;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
+ this.selectAll = dataStream.selectAll;
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
@@ -911,11 +914,11 @@ public abstract class DataStream<OUT> {
if (inputStream instanceof MergedDataStream) {
for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
- inputStream.userDefinedNames);
+ inputStream.userDefinedNames, inputStream.selectAll);
}
} else {
jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
- typeNumber, inputStream.userDefinedNames);
+ typeNumber, inputStream.userDefinedNames, inputStream.selectAll);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 46be328..16362ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -93,12 +93,12 @@ public class IterativeDataStream<IN> extends
for (DataStream<IN> stream : ((MergedDataStream<IN>) iterationTail).mergedStreams) {
String inputID = stream.getId();
jobGraphBuilder.setEdge(inputID, returnStream.getId(),
- new ForwardPartitioner<IN>(), 0, name);
+ new ForwardPartitioner<IN>(), 0, name, false);
}
} else {
jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
- new ForwardPartitioner<IN>(), 0, name);
+ new ForwardPartitioner<IN>(), 0, name, false);
}
return iterationTail;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b1b0939..1f01feb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -104,22 +104,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
* @return The {@link SplitDataStream}
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
- return split(outputSelector, null);
- }
-
- /**
- * Operator used for directing tuples to specific named outputs using an
- * {@link OutputSelector}. Calling this method on an operator creates a new
- * {@link SplitDataStream}.
- *
- * @param outputSelector
- * The user defined {@link OutputSelector} for directing the
- * tuples.
- * @param outputNames
- * An array of all the output names to be used for selectAll
- * @return The {@link SplitDataStream}
- */
- public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector, String[] outputNames) {
try {
jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
@@ -127,7 +111,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
throw new RuntimeException("Cannot serialize OutputSelector");
}
- return new SplitDataStream<OUT>(this, outputNames);
+ return new SplitDataStream<OUT>(this);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index a6cf4b1..0ddb4f0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -32,11 +32,9 @@ import org.apache.flink.streaming.api.collector.OutputSelector;
public class SplitDataStream<OUT> {
DataStream<OUT> dataStream;
- String[] allNames;
- protected SplitDataStream(DataStream<OUT> dataStream, String[] outputNames) {
+ protected SplitDataStream(DataStream<OUT> dataStream) {
this.dataStream = dataStream.copy();
- this.allNames = outputNames;
}
/**
@@ -52,22 +50,18 @@ public class SplitDataStream<OUT> {
}
/**
- * Selects all output names from a split data stream. Output names must
- * predefined to use selectAll.
+ * Selects all output names from a split data stream.
*
* @return Returns the selected DataStream
*/
- public DataStream<OUT> selectAll() {
- if (allNames != null) {
- return selectOutput(allNames);
- } else {
- throw new RuntimeException(
- "Output names must be predefined in order to use select all.");
- }
+ public DataStream<OUT> selectAll() {
+ DataStream<OUT> returnStream = dataStream.copy();
+ returnStream.selectAll = true;
+ return returnStream;
}
private DataStream<OUT> selectOutput(String[] outputName) {
- DataStream<OUT> returnStream = dataStream.copy();
+ DataStream<OUT> returnStream = dataStream.copy();
returnStream.userDefinedNames = Arrays.asList(outputName);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index e19eeaa..76277dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -127,9 +127,10 @@ public class OutputHandler<OUT> {
outputs.add(output);
List<String> outputName = configuration.getOutputName(outputNumber);
-
+ boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
+
if (collector != null) {
- collector.addOutput(output, outputName);
+ collector.addOutput(output, outputName, isSelectAllOutput);
}
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 1608b7b..fdf9db3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -1,30 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,74 +20,79 @@ import org.junit.Test;
public class DirectedOutputTest {
- static HashSet<Long> evenSet = new HashSet<Long>();
- static HashSet<Long> oddSet = new HashSet<Long>();
-
- private static class PlusTwo extends RichMapFunction<Long, Long> {
+ private static final String TEN = "ten";
+ private static final String ODD = "odd";
+ private static final String ALL = "all";
+ private static final String EVEN_AND_ODD = "evenAndOdd";
+ private static final String ODD_AND_TEN = "oddAndTen";
+ private static final String EVEN = "even";
+ static final class MyMap implements MapFunction<Long, Long> {
private static final long serialVersionUID = 1L;
@Override
- public Long map(Long arg0) throws Exception {
- arg0 += 2;
- return arg0;
+ public Long map(Long value) throws Exception {
+ return value;
}
}
- private static class EvenSink implements SinkFunction<Long> {
-
+ static final class MyOutputSelector extends OutputSelector<Long> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Long tuple) {
- evenSet.add(tuple);
+ public void select(Long value, Collection<String> outputs) {
+ if (value % 2 == 0) {
+ outputs.add(EVEN);
+ } else {
+ outputs.add(ODD);
+ }
+
+ if (value == 10L) {
+ outputs.add(TEN);
+ }
}
}
+
+ static final class ListSink implements SinkFunction<Long> {
+ private static final long serialVersionUID = 1L;
- private static class OddSink implements SinkFunction<Long> {
+ private String name;
+ private transient List<Long> list;
- private static final long serialVersionUID = 1L;
+ public ListSink(String name) {
+ this.name = name;
+ }
@Override
- public void invoke(Long tuple) {
- oddSet.add(tuple);
+ public void invoke(Long value) {
+ list.add(value);
}
- }
-
- private static class MySelector extends OutputSelector<Long> {
-
- private static final long serialVersionUID = 1L;
- @Override
- public void select(Long tuple, Collection<String> outputs) {
- int mod = (int) (tuple % 2);
- switch (mod) {
- case 0:
- outputs.add("ds1");
- break;
- case 1:
- outputs.add("ds2");
- break;
- }
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ outputs.put(name, new ArrayList<Long>());
+ this.list = outputs.get(name);
}
}
- @SuppressWarnings("unused")
+ private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
+
@Test
- public void directOutputTest() throws Exception {
+ public void outputSelectorTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector(),
- new String[] { "ds1", "ds2" });
- DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
- DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
-
- env.executeTest(32);
-
- HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
- HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
- assertEquals(expectedEven, evenSet);
- assertEquals(expectedOdd, oddSet);
+ SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
+ source.select(EVEN).addSink(new ListSink(EVEN));
+ source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
+ source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
+ source.selectAll().addSink(new ListSink(ALL));
+
+ env.executeTest(128);
+ assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
+ assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L), outputs.get(ODD_AND_TEN));
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(EVEN_AND_ODD));
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(ALL));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbfcc9eb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 05d7494..66234d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -38,7 +38,7 @@ public class StreamCollectorTest {
sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, sd);
- collector.addOutput(recWriter, new ArrayList<String>());
+ collector.addOutput(recWriter, new ArrayList<String>(), false);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));