You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/06 09:56:19 UTC
[2/2] flink git commit: [FLINK-5400] [core] Add accessor to folding
states in RuntimeContext
[FLINK-5400] [core] Add accessor to folding states in RuntimeContext
This closes #3053
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63f831a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63f831a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63f831a
Branch: refs/heads/master
Commit: d63f831a4b11bb927a8cc216b4901d9262e44053
Parents: d156f8d
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Fri Dec 30 11:25:05 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 5 20:46:52 2017 +0100
----------------------------------------------------------------------
.../api/common/functions/RuntimeContext.java | 52 ++++++++++++++++++--
.../util/AbstractRuntimeUDFContext.java | 9 ++++
.../flink/api/common/state/KeyedStateStore.java | 50 +++++++++++++++++--
.../runtime/state/DefaultKeyedStateStore.java | 13 +++++
.../api/functions/async/RichAsyncFunction.java | 8 +++
.../api/operators/StreamingRuntimeContext.java | 9 ++++
.../functions/async/RichAsyncFunctionTest.java | 13 +++++
.../operators/StreamingRuntimeContextTest.java | 31 ++++++++++++
8 files changed, 175 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ce513cb..405e390 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -307,7 +309,7 @@ public interface RuntimeContext {
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
/**
- * Gets a handle to the system's key/value list state. This state is similar to the state
+ * Gets a handle to the system's key/value reducing state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
@@ -319,16 +321,16 @@ public interface RuntimeContext {
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
- * private ReducingState<Long> sum;
+ * private ReducingState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
- * new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b));
+ * new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
- * sum.add(value.count());
- * return new Tuple2<>(value, sum.get());
+ * state.add(value.count());
+ * return new Tuple2<>(value, state.get());
* }
* });
*
@@ -345,4 +347,44 @@ public interface RuntimeContext {
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
+
+ /**
+ * Gets a handle to the system's key/value folding state. This state is similar to the state
+ * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+ * aggregates values with different types.
+ *
+ * <p>This state is only accessible if the function is executed on a KeyedStream.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+ *
+ * private FoldingState<MyType, Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getReducingState(
+ * new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * state.add(value);
+ * return new Tuple2<>(value, state.get());
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * @param stateProperties The descriptor defining the properties of the stats.
+ *
+ * @param <T> The type of value stored in the state.
+ *
+ * @return The partitioned state object.
+ *
+ * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+ * function (function is not part of a KeyedStream).
+ */
+ @PublicEvolving
+ <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 4f559bf..0eafeaa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -205,4 +207,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
+
+ @Override
+ @PublicEvolving
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ throw new UnsupportedOperationException(
+ "This state is only accessible by functions executed on a KeyedStream");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 89c1240..bbb4c67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -118,7 +118,7 @@ public interface KeyedStateStore {
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
/**
- * Gets a handle to the system's key/value list state. This state is similar to the state
+ * Gets a handle to the system's key/value reducing state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
@@ -130,16 +130,16 @@ public interface KeyedStateStore {
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
- * private ReducingState<Long> sum;
+ * private ReducingState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
- * new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b));
+ * new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
- * sum.add(value.count());
- * return new Tuple2<>(value, sum.get());
+ * state.add(value.count());
+ * return new Tuple2<>(value, state.get());
* }
* });
*
@@ -156,4 +156,44 @@ public interface KeyedStateStore {
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
+
+ /**
+ * Gets a handle to the system's key/value folding state. This state is similar to the state
+ * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+ * aggregates values with different types.
+ *
+ * <p>This state is only accessible if the function is executed on a KeyedStream.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+ *
+ * private FoldingState<MyType, Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getReducingState(
+ * new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * state.add(value);
+ * return new Tuple2<>(value, state.get());
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * @param stateProperties The descriptor defining the properties of the stats.
+ *
+ * @param <T> The type of value stored in the state.
+ *
+ * @return The partitioned state object.
+ *
+ * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+ * function (function is not part of a KeyedStream).
+ */
+ @PublicEvolving
+ <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index 776f4b8..d8b8aa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -80,6 +82,17 @@ public class DefaultKeyedStateStore implements KeyedStateStore {
}
}
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ requireNonNull(stateProperties, "The state properties must not be null");
+ try {
+ stateProperties.initializeSerializerUnlessSet(executionConfig);
+ return getPartitionedState(stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while getting state", e);
+ }
+ }
+
private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
return keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 232206c..e6a186a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -32,6 +32,8 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -165,6 +167,12 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
}
@Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ throw new UnsupportedOperationException("State is not supported in rich async functions.");
+ }
+
+
+ @Override
public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index b450923..b9c9b9b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -128,6 +130,13 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
return keyedStateStore.getReducingState(stateProperties);
}
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+ stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+ return keyedStateStore.getFoldingState(stateProperties);
+ }
+
private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 12ac693..815f856 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.functions.async;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -165,6 +167,17 @@ public class RichAsyncFunctionTest {
}
try {
+ runtimeContext.getFoldingState(new FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() {
+ @Override
+ public Integer fold(Integer accumulator, Integer value) throws Exception {
+ return accumulator;
+ }
+ }, Integer.class));
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
runtimeContext.addAccumulator("foobar", new Accumulator<Integer, Integer>() {
private static final long serialVersionUID = -4673320336846482358L;
http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 155a16f..0d9003f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -116,6 +118,35 @@ public class StreamingRuntimeContextTest {
}
@Test
+ public void testFoldingStateInstantiation() throws Exception {
+
+ final ExecutionConfig config = new ExecutionConfig();
+ config.registerKryoType(Path.class);
+
+ final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
+
+ StreamingRuntimeContext context = new StreamingRuntimeContext(
+ createDescriptorCapturingMockOp(descriptorCapture, config),
+ createMockEnvironment(),
+ Collections.<String, Accumulator<?, ?>>emptyMap());
+
+ @SuppressWarnings("unchecked")
+ FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);
+
+ FoldingStateDescriptor<String, TaskInfo> descr =
+ new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);
+
+ context.getFoldingState(descr);
+
+ FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
+ TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+ // check that the Path class is really registered, i.e., the execution config was applied
+ assertTrue(serializer instanceof KryoSerializer);
+ assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
+ }
+
+ @Test
public void testListStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();