You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:26 UTC
[4/5] flink git commit: [FLINK-2283] [streaming] grouped reduce and
fold operators checkpoint state
[FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/225704bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/225704bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/225704bc
Branch: refs/heads/master
Commit: 225704bc9912536042027fca1a9880beba3bc2bb
Parents: 906bd6d
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 13 08:19:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../streaming/api/operators/StreamFold.java | 99 --------------------
.../api/operators/StreamGroupedFold.java | 72 ++++++++++++--
.../api/operators/StreamGroupedReduce.java | 21 +++--
3 files changed, 77 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
deleted file mode 100644
index 81115f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class StreamFold<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- protected transient OUT accumulator;
- private byte[] serializedInitialValue;
-
- protected TypeSerializer<OUT> outTypeSerializer;
-
- public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
- super(folder);
- this.accumulator = initialValue;
- this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
- output.collect(element.replace(accumulator));
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
-
- if (serializedInitialValue == null) {
- throw new RuntimeException("No initial value was serialized for the fold " +
- "operator. Probably the setOutputType method was not called.");
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
- new DataInputStream(bais)
- );
-
- accumulator = outTypeSerializer.deserialize(in);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
- @Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
- outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
- new DataOutputStream(baos)
- );
-
- try {
- outTypeSerializer.serialize(accumulator, out);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to serialize initial value of type " +
- accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
- }
-
- serializedInitialValue = baos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f4e44c6..f8f167a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -17,50 +17,106 @@
package org.apache.flink.streaming.api.operators;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
+ // Grouped values
private KeySelector<IN, ?> keySelector;
- private transient Map<Object, OUT> values;
+ private transient OperatorState<HashMap<Object, OUT>> values;
+
+ // Initial value serialization
+ private byte[] serializedInitialValue;
+ private TypeSerializer<OUT> outTypeSerializer;
+ private transient OUT initialValue;
public StreamGroupedFold(
FoldFunction<IN, OUT> folder,
KeySelector<IN, ?> keySelector,
OUT initialValue) {
- super(folder, initialValue);
+ super(folder);
this.keySelector = keySelector;
+ this.initialValue = initialValue;
}
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
- values = new HashMap<Object, OUT>();
+ if (serializedInitialValue == null) {
+ throw new RuntimeException("No initial value was serialized for the fold " +
+ "operator. Probably the setOutputType method was not called.");
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+ InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+ new DataInputStream(bais)
+ );
+ initialValue = outTypeSerializer.deserialize(in);
+
+ values = runtimeContext.getOperatorState("flink_internal_fold_values",
+ new HashMap<Object, OUT>(), false);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
- OUT value = values.get(key);
+ OUT value = values.value().get(key);
if (value != null) {
OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
- values.put(key, folded);
+ values.value().put(key, folded);
output.collect(element.replace(folded));
} else {
- OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
- values.put(key, first);
+ OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+ values.value().put(key, first);
output.collect(element.replace(first));
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+ outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
+ new DataOutputStream(baos)
+ );
+
+ try {
+ outTypeSerializer.serialize(initialValue, out);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to serialize initial value of type " +
+ initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
+ }
+
+ serializedInitialValue = baos.toByteArray();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 8805138..e1f9f06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -31,7 +33,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector;
- private transient Map<Object, IN> values;
+ private transient OperatorState<HashMap<Object, IN>> values;
public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
super(reducer);
@@ -39,21 +41,24 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ values = runtimeContext.getOperatorState("flink_internal_reduce_values",
+ new HashMap<Object, IN>(), false);
+ }
+
+ @Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
- if (values == null) {
- values = new HashMap<>();
- }
-
- IN currentValue = values.get(key);
+ IN currentValue = values.value().get(key);
if (currentValue != null) {
// TODO: find a way to let operators copy elements (maybe)
IN reduced = userFunction.reduce(currentValue, element.getValue());
- values.put(key, reduced);
+ values.value().put(key, reduced);
output.collect(element.replace(reduced));
} else {
- values.put(key, element.getValue());
+ values.value().put(key, element.getValue());
output.collect(element.replace(element.getValue()));
}
}