You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:26 UTC

[4/5] flink git commit: [FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state

[FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/225704bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/225704bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/225704bc

Branch: refs/heads/master
Commit: 225704bc9912536042027fca1a9880beba3bc2bb
Parents: 906bd6d
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 13 08:19:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/StreamFold.java     | 99 --------------------
 .../api/operators/StreamGroupedFold.java        | 72 ++++++++++++--
 .../api/operators/StreamGroupedReduce.java      | 21 +++--
 3 files changed, 77 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
deleted file mode 100644
index 81115f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class StreamFold<IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected transient OUT accumulator;
-	private byte[] serializedInitialValue;
-
-	protected TypeSerializer<OUT> outTypeSerializer;
-
-	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
-		super(folder);
-		this.accumulator = initialValue;
-		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
-		output.collect(element.replace(accumulator));
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-
-		if (serializedInitialValue == null) {
-			throw new RuntimeException("No initial value was serialized for the fold " +
-					"operator. Probably the setOutputType method was not called.");
-		}
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
-			new DataInputStream(bais)
-		);
-
-		accumulator = outTypeSerializer.deserialize(in);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
-			new DataOutputStream(baos)
-		);
-
-		try {
-			outTypeSerializer.serialize(accumulator, out);
-		} catch (IOException ioe) {
-			throw new RuntimeException("Unable to serialize initial value of type " +
-					accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
-		}
-
-		serializedInitialValue = baos.toByteArray();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f4e44c6..f8f167a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -17,50 +17,106 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
+	// Grouped values
 	private KeySelector<IN, ?> keySelector;
-	private transient Map<Object, OUT> values;
+	private transient OperatorState<HashMap<Object, OUT>> values;
+
+	// Initial value serialization
+	private byte[] serializedInitialValue;
+	private TypeSerializer<OUT> outTypeSerializer;
+	private transient OUT initialValue;
 
 	public StreamGroupedFold(
 			FoldFunction<IN, OUT> folder,
 			KeySelector<IN, ?> keySelector,
 			OUT initialValue) {
-		super(folder, initialValue);
+		super(folder);
 		this.keySelector = keySelector;
+		this.initialValue = initialValue;
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		super.open(configuration);
 
-		values = new HashMap<Object, OUT>();
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+					"operator. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+				new DataInputStream(bais)
+		);
+		initialValue = outTypeSerializer.deserialize(in);
+
+		values = runtimeContext.getOperatorState("flink_internal_fold_values",
+				new HashMap<Object, OUT>(), false);
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
-		OUT value = values.get(key);
+		OUT value = values.value().get(key);
 
 		if (value != null) {
 			OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
-			values.put(key, folded);
+			values.value().put(key, folded);
 			output.collect(element.replace(folded));
 		} else {
-			OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
-			values.put(key, first);
+			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+			values.value().put(key, first);
 			output.collect(element.replace(first));
 		}
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
+				new DataOutputStream(baos)
+		);
+
+		try {
+			outTypeSerializer.serialize(initialValue, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+					initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 8805138..e1f9f06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -31,7 +33,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	private static final long serialVersionUID = 1L;
 
 	private KeySelector<IN, ?> keySelector;
-	private transient Map<Object, IN> values;
+	private transient OperatorState<HashMap<Object, IN>> values;
 
 	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
 		super(reducer);
@@ -39,21 +41,24 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	}
 
 	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		values = runtimeContext.getOperatorState("flink_internal_reduce_values",
+				new HashMap<Object, IN>(), false);
+	}
+
+	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
 
-		if (values == null) {
-			values = new HashMap<>();
-		}
-
-		IN currentValue = values.get(key);
+		IN currentValue = values.value().get(key);
 		if (currentValue != null) {
 			// TODO: find a way to let operators copy elements (maybe)
 			IN reduced = userFunction.reduce(currentValue, element.getValue());
-			values.put(key, reduced);
+			values.value().put(key, reduced);
 			output.collect(element.replace(reduced));
 		} else {
-			values.put(key, element.getValue());
+			values.value().put(key, element.getValue());
 			output.collect(element.replace(element.getValue()));
 		}
 	}