You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:56 UTC

[13/18] git commit: [streaming] Added counter aggregation

[streaming] Added counter aggregation


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/03a28cbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/03a28cbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/03a28cbb

Branch: refs/heads/master
Commit: 03a28cbb755c49c2b97205989434c0c1664e8a3e
Parents: b6ffdba
Author: ghermann <re...@gmail.com>
Authored: Mon Sep 15 17:25:22 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 26 +---------
 .../api/datastream/BatchedDataStream.java       |  3 +-
 .../streaming/api/datastream/DataStream.java    | 37 +++++++++------
 .../api/datastream/GroupedDataStream.java       |  3 +-
 .../invokable/operator/CounterInvokable.java    | 50 ++++++++++++++++++++
 .../operator/CounterInvokableTest.java          | 39 +++++++++++++++
 6 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 4bb022a..e6c5042 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -312,10 +312,11 @@ public class JobGraphBuilder {
 			int parallelism, long waitTime) {
 
 		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
+
 		iterationIds.put(componentName, iterationID);
 		iterationIDtoSinkName.put(iterationID, componentName);
+
 		setBytesFrom(iterationTail, componentName);
-		// setInTypeWrappersFrom(iterationTail, componentName);
 		iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
@@ -576,44 +577,21 @@ public class JobGraphBuilder {
 	 *            to
 	 */
 	public void setBytesFrom(String from, String to) {
-
 		operatorNames.put(to, operatorNames.get(from));
 		serializedFunctions.put(to, serializedFunctions.get(from));
 
-		setTypeWrappersFrom(from, to);
-	}
-
-	public void setTypeWrappersFrom(String from, String to) {
-		setInToOutTypeWrappersFrom(from, to);
-		setOutToOutTypeWrappersFrom(from, to);
-	}
-
-	public void setInToOutTypeWrappersFrom(String from, String to) {
-		// TODO rename function
 		typeWrapperIn1.put(to, typeWrapperOut1.get(from));
 		typeWrapperIn2.put(to, typeWrapperOut2.get(from));
-	}
-
-	public void setOutToOutTypeWrappersFrom(String from, String to) {
-		// TODO rename function
 		typeWrapperOut1.put(to, typeWrapperOut1.get(from));
 		typeWrapperOut2.put(to, typeWrapperOut2.get(from));
 	}
 
-	public void setInToInTypeWrappersFrom(String from, String to) {
-		// TODO rename function
-		typeWrapperIn1.put(to, typeWrapperIn1.get(from));
-		typeWrapperIn2.put(to, typeWrapperIn2.get(from));
-	}
-
 	public TypeInformation<?> getInTypeInfo(String id) {
-		// TODO
 		System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
 		return typeWrapperIn1.get(id).getTypeInfo();
 	}
 
 	public TypeInformation<?> getOutTypeInfo(String id) {
-		// TODO
 		return typeWrapperOut1.get(id).getTypeInfo();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 0aa5de6..bcedac9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -202,9 +202,8 @@ public class BatchedDataStream<OUT> {
 		BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
-				aggregate, null, null, invokable);
+				aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
 
-		dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId());
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bebda91..0e1ae57 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
@@ -60,6 +61,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 import org.apache.flink.types.TypeInformation;
 
@@ -86,7 +88,7 @@ public class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
-	protected TypeSerializerWrapper<OUT> outTypeWrapper;
+	protected final TypeSerializerWrapper<OUT> outTypeWrapper;
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -556,15 +558,26 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> max() {
 		return max(0);
 	}
+	
+	/**
+	 * Applies an aggregation that gives the count of the data point.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<Long, ?> count() {
+		TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
+		TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
+
+		return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
+	}
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, null,
-				null, invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
+				outTypeWrapper, invokable);
 
-		this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
 		return returnStream;
 	}
 
@@ -599,9 +612,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> print() {
 		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);
-
-		jobGraphBuilder.setInToOutTypeWrappersFrom(inputStream.getId(), returnStream.getId());
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);
 
 		return returnStream;
 	}
@@ -721,8 +732,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), null);
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+				path, format, millis, endTuple), inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -749,8 +759,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -873,8 +882,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple));
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+				path, format, millis, endTuple), inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -901,8 +909,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 30826d3..e30d316 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -243,9 +243,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
-				null, null, invokable);
+				outTypeWrapper, outTypeWrapper, invokable);
 
-		this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
new file mode 100644
index 0000000..29903b1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+
+public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
+	private static final long serialVersionUID = 1L;
+	
+	Long count = 0L;
+	
+	public CounterInvokable() {
+		super(null);
+	}
+	
+	@Override
+	protected void immutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			callUserFunctionAndLogException();
+			resetReuse();
+		}
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		collector.collect(++count);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03a28cbb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
new file mode 100644
index 0000000..2124eb7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class CounterInvokableTest {
+
+	@Test
+	public void counterTest() {
+		CounterInvokable<String> invokable = new CounterInvokable<String>();
+
+		List<Long> expected = Arrays.asList(1L, 2L, 3L);
+		List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
+		
+		assertEquals(expected, actual);
+	}
+}