You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:26 UTC
[49/51] [abbrv] git commit: [streaming] GroupReduce operator added +
StreamCollector bugfix
[streaming] GroupReduce operator added + StreamCollector bugfix
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/309727ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/309727ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/309727ef
Branch: refs/heads/master
Commit: 309727ef94cb0bdca7e82ea31f020abf2c7075e4
Parents: 74d3742
Author: gyfora <gy...@gmail.com>
Authored: Wed Aug 6 22:36:53 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200
----------------------------------------------------------------------
.../connectors/twitter/TwitterLocal.java | 34 +++++++---
.../api/collector/StreamCollector.java | 12 ++--
.../streaming/api/datastream/DataStream.java | 46 ++++++++++---
.../datastream/SingleOutputStreamOperator.java | 4 +-
.../streaming/api/invokable/SinkInvokable.java | 16 +----
.../api/invokable/SourceInvokable.java | 18 +----
.../api/invokable/StreamComponentInvokable.java | 15 ++++-
.../api/invokable/StreamRecordInvokable.java | 5 ++
.../api/invokable/UserTaskInvokable.java | 6 ++
.../operator/BatchReduceInvokable.java | 1 +
.../api/invokable/operator/FilterInvokable.java | 16 +----
.../invokable/operator/FlatMapInvokable.java | 16 +----
.../operator/GroupReduceInvokable.java | 71 ++++++++++++++++++++
.../api/invokable/operator/MapInvokable.java | 17 +----
.../operator/StreamReduceInvokable.java | 18 ++---
.../operator/WindowReduceInvokable.java | 1 +
.../api/invokable/operator/co/CoInvokable.java | 5 ++
.../invokable/operator/co/CoMapInvokable.java | 1 +
.../AbstractStreamComponent.java | 5 +-
.../api/collector/StreamCollectorTest.java | 6 +-
.../examples/wordcount/WordCountCounter.java | 32 ++-------
.../examples/wordcount/WordCountLocal.java | 5 +-
.../examples/wordcount/WordCountSplitter.java | 7 +-
23 files changed, 203 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 878a7ad..668647d 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -19,6 +19,9 @@
package org.apache.flink.streaming.connectors.twitter;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -28,19 +31,19 @@ import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
import org.apache.flink.util.Collector;
/**
- * This program demonstrate the use of TwitterSource.
- * Its aim is to count the frequency of the languages of tweets
+ * This program demonstrate the use of TwitterSource. Its aim is to count the
+ * frequency of the languages of tweets
*/
-public class TwitterLocal {
+public class TwitterLocal implements Serializable {
+ private static final long serialVersionUID = 1L;
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
/**
- * FlatMapFunction to determine the language of tweets if possible
+ * FlatMapFunction to determine the language of tweets if possible
*/
- public static class SelectLanguageFlatMap extends
- JSONParseFlatMap<String, String> {
+ public static class SelectLanguageFlatMap extends JSONParseFlatMap<String, String> {
private static final long serialVersionUID = 1L;
@@ -54,7 +57,9 @@ public class TwitterLocal {
}
/**
- * Change the null String to space character. Useful when null is undesirable.
+ * Change the null String to space character. Useful when null is
+ * undesirable.
+ *
* @param in
* @return
*/
@@ -83,11 +88,18 @@ public class TwitterLocal {
DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
SOURCE_PARALLELISM);
-
DataStream<Tuple2<String, Integer>> dataStream = streamSource
- .flatMap(new SelectLanguageFlatMap())
- .partitionBy(0)
- .map(new WordCountCounter());
+ .flatMap(new SelectLanguageFlatMap()).partitionBy(0)
+ .map(new MapFunction<String, Tuple2<String, Integer>>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> map(String value) throws Exception {
+
+ return new Tuple2<String, Integer>(value, 1);
+ }
+ }).groupReduce(new WordCountCounter(), 0);
dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index fe21c29..e76cf94 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -60,9 +60,12 @@ public class StreamCollector<OUT> implements Collector<OUT> {
*/
public StreamCollector(int channelID,
SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
-
this.serializationDelegate = serializationDelegate;
- this.streamRecord = new StreamRecord<OUT>();
+ if (serializationDelegate != null) {
+ this.streamRecord = serializationDelegate.getInstance();
+ } else {
+ this.streamRecord = new StreamRecord<OUT>();
+ }
this.channelID = channelID;
this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
@@ -82,8 +85,9 @@ public class StreamCollector<OUT> implements Collector<OUT> {
for (String outputName : outputNames) {
if (outputName != null) {
if (!outputMap.containsKey(outputName)) {
- outputMap.put(outputName,
- new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
+ outputMap
+ .put(outputName,
+ new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
outputMap.get(outputName).add(output);
} else {
if (!outputMap.get(outputName).contains(output)) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 86b3322..bec55e0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -30,10 +30,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -48,6 +50,7 @@ import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
@@ -185,16 +188,16 @@ public abstract class DataStream<OUT> {
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are partitioned by their hashcode and are sent to only one component.
*
- * @param keyposition
+ * @param keyPosition
* The field used to compute the hashcode.
* @return The DataStream with field partitioning set.
*/
- public DataStream<OUT> partitionBy(int keyposition) {
- if (keyposition < 0) {
+ public DataStream<OUT> partitionBy(int keyPosition) {
+ if (keyPosition < 0) {
throw new IllegalArgumentException("The position of the field must be non-negative");
}
- return setConnectionType(new FieldsPartitioner<OUT>(keyposition));
+ return setConnectionType(new FieldsPartitioner<OUT>(keyPosition));
}
/**
@@ -280,6 +283,29 @@ public abstract class DataStream<OUT> {
}
/**
+ * Applies a group and a reduce transformation on the DataStream grouped on
+ * the given key position. The {@link ReduceFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same reducer.The user can also extend
+ * {@link RichReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ * @param reducer
+ * The {@link ReduceFunction} that will be called for every
+ * element of the input values with the same key.
+ * @param keyPosition
+ * The key position in the input values on which the grouping is
+ * made.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> groupReduce(ReduceFunction<OUT> reducer,
+ int keyPosition) {
+ return addFunction("groupReduce", reducer,
+ new FunctionTypeWrapper<OUT, Tuple, OUT>(reducer, ReduceFunction.class, 0, -1, 0),
+ new GroupReduceInvokable<OUT>(reducer, keyPosition)).partitionBy(keyPosition);
+ }
+
+ /**
* Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a {@link GroupReduceFunction} for each tuple batch
* of the predefined size. Each GroupReduceFunction call can return any
@@ -505,8 +531,8 @@ public abstract class DataStream<OUT> {
*/
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
- DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
- path, format, batchSize, endTuple), null);
+ DataStreamSink<OUT> returnStream = addSink(inputStream,
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -657,8 +683,8 @@ public abstract class DataStream<OUT> {
*/
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
- DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
- path, format, batchSize, endTuple), null);
+ DataStreamSink<OUT> returnStream = addSink(inputStream,
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -801,8 +827,8 @@ public abstract class DataStream<OUT> {
sinkFunction, SinkFunction.class, 0, -1, 0));
}
- private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction,
- TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
+ private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
+ SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 6d660b1..b86fac0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -77,7 +77,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
* The mutability of the operator.
* @return The operator with mutability set.
*/
- public DataStream<OUT> setMutability(boolean isMutable) {
+ public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
jobGraphBuilder.setMutability(id, isMutable);
return this;
}
@@ -90,7 +90,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
* The maximum time between two output flushes.
* @return The operator with buffer timeout set.
*/
- public DataStream<OUT> setBufferTimeout(long timeoutMillis) {
+ public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 50bdd42..887df8b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.api.invokable;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@@ -29,6 +27,7 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
private SinkFunction<IN> sinkFunction;
public SinkInvokable(SinkFunction<IN> sinkFunction) {
+ super(sinkFunction);
this.sinkFunction = sinkFunction;
}
@@ -47,17 +46,4 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
}
}
- @Override
- public void open(Configuration parameters) throws Exception {
- if (sinkFunction instanceof RichFunction) {
- ((RichFunction) sinkFunction).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (sinkFunction instanceof RichFunction) {
- ((RichFunction) sinkFunction).close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index d7710ae..a4be1e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.source.SourceFunction;
public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements Serializable {
@@ -31,10 +29,9 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
private SourceFunction<OUT> sourceFunction;
- public SourceInvokable() {
- }
public SourceInvokable(SourceFunction<OUT> sourceFunction) {
+ super(sourceFunction);
this.sourceFunction = sourceFunction;
}
@@ -42,17 +39,4 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
sourceFunction.invoke(collector);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- if (sourceFunction instanceof RichFunction) {
- ((RichFunction) sourceFunction).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (sourceFunction instanceof RichFunction) {
- ((RichFunction) sourceFunction).close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index 02ee5fd..ed718f1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -22,6 +22,8 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
@@ -35,6 +37,11 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
@SuppressWarnings("unused")
private int channelID;
protected Collector<OUT> collector;
+ protected Function userFunction;
+
+ public StreamComponentInvokable(Function userFunction) {
+ this.userFunction = userFunction;
+ }
public void setCollector(Collector<OUT> collector) {
this.collector = collector;
@@ -47,12 +54,16 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
@Override
public void open(Configuration parameters) throws Exception {
- System.out.println("Open not implemented");
+ if (userFunction instanceof RichFunction) {
+ ((RichFunction) userFunction).open(parameters);
+ }
}
@Override
public void close() throws Exception {
- System.out.println("Close not implemented");
+ if (userFunction instanceof RichFunction) {
+ ((RichFunction) userFunction).close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index b1cdde1..27dc05a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.invokable;
import java.io.IOException;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
@@ -29,6 +30,10 @@ import org.apache.flink.util.MutableObjectIterator;
public abstract class StreamRecordInvokable<IN, OUT> extends
StreamComponentInvokable<OUT> {
+ public StreamRecordInvokable(Function userFunction) {
+ super(userFunction);
+ }
+
private static final long serialVersionUID = 1L;
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
index 0a88efd..8d1da6b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
@@ -21,8 +21,14 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
+import org.apache.flink.api.common.functions.Function;
+
public abstract class UserTaskInvokable<IN, OUT> extends
StreamRecordInvokable<IN, OUT> implements Serializable {
+ public UserTaskInvokable(Function userFunction) {
+ super(userFunction);
+ }
+
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index c3c861b..4e0a7a5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -29,6 +29,7 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
private int batchSize;
public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
+ super(reduceFunction);
this.reducer = reduceFunction;
this.batchSize = batchSize;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index b64f08b..388920c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -20,8 +20,6 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
@@ -31,6 +29,7 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
FilterFunction<IN> filterFunction;
public FilterInvokable(FilterFunction<IN> filterFunction) {
+ super(filterFunction);
this.filterFunction = filterFunction;
}
@@ -53,17 +52,4 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
}
}
- @Override
- public void open(Configuration parameters) throws Exception {
- if (filterFunction instanceof RichFunction) {
- ((RichFunction) filterFunction).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (filterFunction instanceof RichFunction) {
- ((RichFunction) filterFunction).close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index cc9fcc1..4cb4712 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -20,8 +20,6 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@@ -30,6 +28,7 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
private FlatMapFunction<IN, OUT> flatMapper;
public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
+ super(flatMapper);
this.flatMapper = flatMapper;
}
@@ -48,17 +47,4 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
}
}
- @Override
- public void open(Configuration parameters) throws Exception {
- if (flatMapper instanceof RichFunction) {
- ((RichFunction) flatMapper).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (flatMapper instanceof RichFunction) {
- ((RichFunction) flatMapper).close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
new file mode 100755
index 0000000..67af978
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+
+public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
+ private ReduceFunction<IN> reducer;
+ private int keyPosition;
+ private Map<Object, IN> values;
+
+ public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
+ super(reducer);
+ this.reducer = reducer;
+ this.keyPosition = keyPosition;
+ values = new HashMap<Object, IN>();
+ }
+
+ @Override
+ protected void immutableInvoke() throws Exception {
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ reduce();
+ resetReuse();
+ }
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ reduce();
+ }
+ }
+
+ private void reduce() throws Exception {
+ Object key = reuse.getField(keyPosition);
+ IN currentValue = values.get(key);
+ IN nextValue = reuse.getObject();
+ if (currentValue != null) {
+ IN reduced = reducer.reduce(currentValue, nextValue);
+ values.put(key, reduced);
+ collector.collect(reduced);
+ } else {
+ values.put(key, nextValue);
+ collector.collect(nextValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 9dbb678..53e85e0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -20,8 +20,6 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@@ -30,6 +28,7 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
private MapFunction<IN, OUT> mapper;
public MapInvokable(MapFunction<IN, OUT> mapper) {
+ super(mapper);
this.mapper = mapper;
}
@@ -47,18 +46,4 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
collector.collect(mapper.map(reuse.getObject()));
}
}
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (mapper instanceof RichFunction) {
- ((RichFunction) mapper).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (mapper instanceof RichFunction) {
- ((RichFunction) mapper).close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 548a298..de0a18a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -21,12 +21,17 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.Iterator;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+
+ public StreamReduceInvokable(Function userFunction) {
+ super(userFunction);
+ }
+
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
protected BatchIterator<IN> userIterator;
@@ -35,16 +40,7 @@ public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<I
@Override
public void open(Configuration parameters) throws Exception {
userIterable = new BatchIterable();
- if (reducer instanceof RichFunction) {
- ((RichFunction) reducer).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (reducer instanceof RichFunction) {
- ((RichFunction) reducer).close();
- }
+ super.open(parameters);
}
protected class BatchIterable implements Iterable<IN> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 309656b..3405641 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -31,6 +31,7 @@ public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OU
boolean window;
public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+ super(reduceFunction);
this.reducer = reduceFunction;
this.windowSize = windowSize;
this.window = true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index d854e89..4ac75a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.invokable.operator.co;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -28,6 +29,10 @@ import org.apache.flink.util.MutableObjectIterator;
public abstract class CoInvokable<IN1, IN2, OUT> extends
StreamComponentInvokable<OUT> {
+ public CoInvokable(Function userFunction) {
+ super(userFunction);
+ }
+
private static final long serialVersionUID = 1L;
protected Collector<OUT> collector;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index cd51081..ab10b40 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -29,6 +29,7 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private CoMapFunction<IN1, IN2, OUT> mapper;
public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
+ super(mapper);
this.mapper = mapper;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index e2e8816..324b5ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -108,6 +108,7 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
outTypeInfo = typeWrapper.getOutputTypeInfo();
outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
+ outSerializationDelegate.setInstance(outSerializer.createInstance());
}
protected void setConfigOutputs(
@@ -116,8 +117,8 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
setCollector();
int numberOfOutputs = configuration.getNumberOfOutputs();
- bufferTimeout= configuration.getBufferTimeout();
-
+ bufferTimeout = configuration.getBufferTimeout();
+
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(i, outputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 72b09c9..99f05ac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -35,9 +35,11 @@ public class StreamCollectorTest {
@Test
public void testCollect() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
+ SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
+ null);
+ sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
- StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2,
- new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
+ StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, sd);
collector.addOutput(recWriter, new ArrayList<String>());
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
index f106afe..6ee824a 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
@@ -19,38 +19,16 @@
package org.apache.flink.streaming.examples.wordcount;
-import java.util.HashMap;
-import java.util.Map;
-
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.common.functions.MapFunction;
-public class WordCountCounter implements MapFunction<String, Tuple2<String, Integer>> {
+public class WordCountCounter implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
- private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
- private String word = "";
- private Integer count = 0;
-
- private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
- // Increments the counter of the occurrence of the input word
@Override
- public Tuple2<String, Integer> map(String inTuple) throws Exception {
- word = inTuple;
-
- if (wordCounts.containsKey(word)) {
- count = wordCounts.get(word) + 1;
- wordCounts.put(word, count);
- } else {
- count = 1;
- wordCounts.put(word, 1);
- }
-
- outTuple.f0 = word;
- outTuple.f1 = count;
-
- return outTuple;
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+ return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index fc31930..ba5a6e2 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.examples.wordcount;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestDataUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
public class WordCountLocal {
@@ -35,7 +35,8 @@ public class WordCountLocal {
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextFile("src/test/resources/testdata/hamlet.txt")
- .flatMap(new WordCountSplitter()).partitionBy(0).map(new WordCountCounter());
+ .flatMap(new WordCountSplitter())
+ .groupReduce(new WordCountCounter(), 0);
dataStream.print();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
index eb0dfd3..b962d4d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
@@ -20,17 +20,18 @@
package org.apache.flink.streaming.examples.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
-public class WordCountSplitter implements FlatMapFunction<String, String> {
+public class WordCountSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
// Splits the lines according on spaces
@Override
- public void flatMap(String inTuple, Collector<String> out) throws Exception {
+ public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : inTuple.split(" ")) {
- out.collect(word);
+ out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
\ No newline at end of file