You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/06 09:56:19 UTC

[2/2] flink git commit: [FLINK-5400] [core] Add accessor to folding states in RuntimeContext

[FLINK-5400] [core] Add accessor to folding states in RuntimeContext

This closes #3053


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63f831a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63f831a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63f831a

Branch: refs/heads/master
Commit: d63f831a4b11bb927a8cc216b4901d9262e44053
Parents: d156f8d
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Fri Dec 30 11:25:05 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 5 20:46:52 2017 +0100

----------------------------------------------------------------------
 .../api/common/functions/RuntimeContext.java    | 52 ++++++++++++++++++--
 .../util/AbstractRuntimeUDFContext.java         |  9 ++++
 .../flink/api/common/state/KeyedStateStore.java | 50 +++++++++++++++++--
 .../runtime/state/DefaultKeyedStateStore.java   | 13 +++++
 .../api/functions/async/RichAsyncFunction.java  |  8 +++
 .../api/operators/StreamingRuntimeContext.java  |  9 ++++
 .../functions/async/RichAsyncFunctionTest.java  | 13 +++++
 .../operators/StreamingRuntimeContextTest.java  | 31 ++++++++++++
 8 files changed, 175 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ce513cb..405e390 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -307,7 +309,7 @@ public interface RuntimeContext {
 	<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
 
 	/**
-	 * Gets a handle to the system's key/value list state. This state is similar to the state
+	 * Gets a handle to the system's key/value reducing state. This state is similar to the state
 	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
 	 * aggregates values.
 	 *
@@ -319,16 +321,16 @@ public interface RuntimeContext {
 	 *
 	 * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
 	 *
-	 *     private ReducingState<Long> sum;
+	 *     private ReducingState<Long> state;
 	 *
 	 *     public void open(Configuration cfg) {
 	 *         state = getRuntimeContext().getReducingState(
-	 *                 new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b));
+	 *                 new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
 	 *     }
 	 *
 	 *     public Tuple2<MyType, Long> map(MyType value) {
-	 *         sum.add(value.count());
-	 *         return new Tuple2<>(value, sum.get());
+	 *         state.add(value.count());
+	 *         return new Tuple2<>(value, state.get());
 	 *     }
 	 * });
 	 * 
@@ -345,4 +347,44 @@ public interface RuntimeContext {
 	 */
 	@PublicEvolving
 	<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
+
+	/**
+	 * Gets a handle to the system's key/value folding state. This state is similar to the state
+	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+	 * aggregates values with different types.
+	 *
+	 * <p>This state is only accessible if the function is executed on a KeyedStream.
+	 *
+	 * <pre>{@code
+	 * DataStream<MyType> stream = ...;
+	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+	 *
+	 * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+	 *
+	 *     private FoldingState<MyType, Long> state;
+	 *
+	 *     public void open(Configuration cfg) {
+	 *         state = getRuntimeContext().getReducingState(
+	 *                 new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
+	 *     }
+	 *
+	 *     public Tuple2<MyType, Long> map(MyType value) {
+	 *         state.add(value);
+	 *         return new Tuple2<>(value, state.get());
+	 *     }
+	 * });
+	 *
+	 * }</pre>
+	 *
+	 * @param stateProperties The descriptor defining the properties of the stats.
+	 *
+	 * @param <T> The type of value stored in the state.
+	 *
+	 * @return The partitioned state object.
+	 *
+	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+	 *                                       function (function is not part of a KeyedStream).
+	 */
+	@PublicEvolving
+	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 4f559bf..0eafeaa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -205,4 +207,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
+
+	@Override
+	@PublicEvolving
+	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+		throw new UnsupportedOperationException(
+				"This state is only accessible by functions executed on a KeyedStream");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 89c1240..bbb4c67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -118,7 +118,7 @@ public interface KeyedStateStore {
 	<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
 
 	/**
-	 * Gets a handle to the system's key/value list state. This state is similar to the state
+	 * Gets a handle to the system's key/value reducing state. This state is similar to the state
 	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
 	 * aggregates values.
 	 *
@@ -130,16 +130,16 @@ public interface KeyedStateStore {
 	 *
 	 * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
 	 *
-	 *     private ReducingState<Long> sum;
+	 *     private ReducingState<Long> state;
 	 *
 	 *     public void open(Configuration cfg) {
 	 *         state = getRuntimeContext().getReducingState(
-	 *                 new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b));
+	 *                 new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
 	 *     }
 	 *
 	 *     public Tuple2<MyType, Long> map(MyType value) {
-	 *         sum.add(value.count());
-	 *         return new Tuple2<>(value, sum.get());
+	 *         state.add(value.count());
+	 *         return new Tuple2<>(value, state.get());
 	 *     }
 	 * });
 	 *
@@ -156,4 +156,44 @@ public interface KeyedStateStore {
 	 */
 	@PublicEvolving
 	<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
+
+	/**
+	 * Gets a handle to the system's key/value folding state. This state is similar to the state
+	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+	 * aggregates values with different types.
+	 *
+	 * <p>This state is only accessible if the function is executed on a KeyedStream.
+	 *
+	 * <pre>{@code
+	 * DataStream<MyType> stream = ...;
+	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+	 *
+	 * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+	 *
+	 *     private FoldingState<MyType, Long> state;
+	 *
+	 *     public void open(Configuration cfg) {
+	 *         state = getRuntimeContext().getReducingState(
+	 *                 new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
+	 *     }
+	 *
+	 *     public Tuple2<MyType, Long> map(MyType value) {
+	 *         state.add(value);
+	 *         return new Tuple2<>(value, state.get());
+	 *     }
+	 * });
+	 *
+	 * }</pre>
+	 *
+	 * @param stateProperties The descriptor defining the properties of the stats.
+	 *
+	 * @param <T> The type of value stored in the state.
+	 *
+	 * @return The partitioned state object.
+	 *
+	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+	 *                                       function (function is not part of a KeyedStream).
+	 */
+	@PublicEvolving
+	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index 776f4b8..d8b8aa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -80,6 +82,17 @@ public class DefaultKeyedStateStore implements KeyedStateStore {
 		}
 	}
 
+	@Override
+	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+		requireNonNull(stateProperties, "The state properties must not be null");
+		try {
+			stateProperties.initializeSerializerUnlessSet(executionConfig);
+			return getPartitionedState(stateProperties);
+		} catch (Exception e) {
+			throw new RuntimeException("Error while getting state", e);
+		}
+	}
+
 	private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
 		return keyedStateBackend.getPartitionedState(
 				VoidNamespace.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 232206c..e6a186a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -32,6 +32,8 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -165,6 +167,12 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
 		}
 
 		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async functions.");
+		}
+
+
+		@Override
 		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
 			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index b450923..b9c9b9b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -128,6 +130,13 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return keyedStateStore.getReducingState(stateProperties);
 	}
 
+	@Override
+	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getFoldingState(stateProperties);
+	}
+
 	private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
 		Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
 		KeyedStateStore keyedStateStore = operator.getKeyedStateStore();

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 12ac693..815f856 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.functions.async;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -165,6 +167,17 @@ public class RichAsyncFunctionTest {
 		}
 
 		try {
+			runtimeContext.getFoldingState(new FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Integer value) throws Exception {
+					return accumulator;
+				}
+			}, Integer.class));
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
+
+		try {
 			runtimeContext.addAccumulator("foobar", new Accumulator<Integer, Integer>() {
 				private static final long serialVersionUID = -4673320336846482358L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63f831a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 155a16f..0d9003f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -116,6 +118,35 @@ public class StreamingRuntimeContextTest {
 	}
 
 	@Test
+	public void testFoldingStateInstantiation() throws Exception {
+
+		final ExecutionConfig config = new ExecutionConfig();
+		config.registerKryoType(Path.class);
+
+		final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
+
+		StreamingRuntimeContext context = new StreamingRuntimeContext(
+				createDescriptorCapturingMockOp(descriptorCapture, config),
+				createMockEnvironment(),
+				Collections.<String, Accumulator<?, ?>>emptyMap());
+
+		@SuppressWarnings("unchecked")
+		FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);
+
+		FoldingStateDescriptor<String, TaskInfo> descr =
+				new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);
+
+		context.getFoldingState(descr);
+
+		FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
+		TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+		// check that the Path class is really registered, i.e., the execution config was applied
+		assertTrue(serializer instanceof KryoSerializer);
+		assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
+	}
+
+	@Test
 	public void testListStateInstantiation() throws Exception {
 
 		final ExecutionConfig config = new ExecutionConfig();