You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:56 UTC
[13/18] git commit: [streaming] Added counter aggregation
[streaming] Added counter aggregation
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/03a28cbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/03a28cbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/03a28cbb
Branch: refs/heads/master
Commit: 03a28cbb755c49c2b97205989434c0c1664e8a3e
Parents: b6ffdba
Author: ghermann <re...@gmail.com>
Authored: Mon Sep 15 17:25:22 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 26 +---------
.../api/datastream/BatchedDataStream.java | 3 +-
.../streaming/api/datastream/DataStream.java | 37 +++++++++------
.../api/datastream/GroupedDataStream.java | 3 +-
.../invokable/operator/CounterInvokable.java | 50 ++++++++++++++++++++
.../operator/CounterInvokableTest.java | 39 +++++++++++++++
6 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 4bb022a..e6c5042 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -312,10 +312,11 @@ public class JobGraphBuilder {
int parallelism, long waitTime) {
addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
+
iterationIds.put(componentName, iterationID);
iterationIDtoSinkName.put(iterationID, componentName);
+
setBytesFrom(iterationTail, componentName);
- // setInTypeWrappersFrom(iterationTail, componentName);
iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
if (LOG.isDebugEnabled()) {
@@ -576,44 +577,21 @@ public class JobGraphBuilder {
* to
*/
public void setBytesFrom(String from, String to) {
-
operatorNames.put(to, operatorNames.get(from));
serializedFunctions.put(to, serializedFunctions.get(from));
- setTypeWrappersFrom(from, to);
- }
-
- public void setTypeWrappersFrom(String from, String to) {
- setInToOutTypeWrappersFrom(from, to);
- setOutToOutTypeWrappersFrom(from, to);
- }
-
- public void setInToOutTypeWrappersFrom(String from, String to) {
- // TODO rename function
typeWrapperIn1.put(to, typeWrapperOut1.get(from));
typeWrapperIn2.put(to, typeWrapperOut2.get(from));
- }
-
- public void setOutToOutTypeWrappersFrom(String from, String to) {
- // TODO rename function
typeWrapperOut1.put(to, typeWrapperOut1.get(from));
typeWrapperOut2.put(to, typeWrapperOut2.get(from));
}
- public void setInToInTypeWrappersFrom(String from, String to) {
- // TODO rename function
- typeWrapperIn1.put(to, typeWrapperIn1.get(from));
- typeWrapperIn2.put(to, typeWrapperIn2.get(from));
- }
-
public TypeInformation<?> getInTypeInfo(String id) {
- // TODO
System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
return typeWrapperIn1.get(id).getTypeInfo();
}
public TypeInformation<?> getOutTypeInfo(String id) {
- // TODO
return typeWrapperOut1.get(id).getTypeInfo();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 0aa5de6..bcedac9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -202,9 +202,8 @@ public class BatchedDataStream<OUT> {
BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
- aggregate, null, null, invokable);
+ aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
- dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId());
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bebda91..0e1ae57 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
@@ -60,6 +61,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.types.TypeInformation;
@@ -86,7 +88,7 @@ public class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
- protected TypeSerializerWrapper<OUT> outTypeWrapper;
+ protected final TypeSerializerWrapper<OUT> outTypeWrapper;
protected List<DataStream<OUT>> mergedStreams;
protected final JobGraphBuilder jobGraphBuilder;
@@ -556,15 +558,26 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}
+
+ /**
+ * Applies an aggregation that gives the count of the data point.
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<Long, ?> count() {
+ TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
+ TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
+
+ return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
+ }
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
- SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, null,
- null, invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
+ outTypeWrapper, invokable);
- this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
return returnStream;
}
@@ -599,9 +612,7 @@ public class DataStream<OUT> {
public DataStreamSink<OUT> print() {
DataStream<OUT> inputStream = this.copy();
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
- DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);
-
- jobGraphBuilder.setInToOutTypeWrappersFrom(inputStream.getId(), returnStream.getId());
+ DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);
return returnStream;
}
@@ -721,8 +732,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
- path, format, millis, endTuple), null);
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ path, format, millis, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -749,8 +759,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -873,8 +882,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
- path, format, millis, endTuple));
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ path, format, millis, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -901,8 +909,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 30826d3..e30d316 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -243,9 +243,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
- null, null, invokable);
+ outTypeWrapper, outTypeWrapper, invokable);
- this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
new file mode 100644
index 0000000..29903b1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+
+public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
+ private static final long serialVersionUID = 1L;
+
+ Long count = 0L;
+
+ public CounterInvokable() {
+ super(null);
+ }
+
+ @Override
+ protected void immutableInvoke() throws Exception {
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ callUserFunctionAndLogException();
+ resetReuse();
+ }
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ collector.collect(++count);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
new file mode 100644
index 0000000..2124eb7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class CounterInvokableTest {
+
+ @Test
+ public void counterTest() {
+ CounterInvokable<String> invokable = new CounterInvokable<String>();
+
+ List<Long> expected = Arrays.asList(1L, 2L, 3L);
+ List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
+
+ assertEquals(expected, actual);
+ }
+}