You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:46 UTC
[19/24] flink git commit: [FLINK-2550] [streaming] Allow multiple
key/value states per operator on top of the new state backend
[FLINK-2550] [streaming] Allow multiple key/value states per operator on top of the new state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c205432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c205432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c205432
Branch: refs/heads/master
Commit: 7c2054326b259bde0e8dd28d3428e63fd285454c
Parents: bb1f5fd
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 23:59:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200
----------------------------------------------------------------------
.../flink/storm/api/FlinkTopologyBuilder.java | 1 +
.../storm/util/SplitStreamTypeKeySelector.java | 5 +-
.../storm/api/FlinkTopologyBuilderTest.java | 3 -
.../api/common/functions/RuntimeContext.java | 10 +-
.../util/AbstractRuntimeUDFContext.java | 4 +-
.../common/typeinfo/PrimitiveArrayTypeInfo.java | 16 +++
.../kafka/testutils/MockRuntimeContext.java | 4 +-
.../streaming/api/datastream/DataStream.java | 4 +-
.../api/operators/AbstractStreamOperator.java | 102 ++++++++++++-------
.../api/operators/StreamGroupedFold.java | 4 +-
.../api/operators/StreamGroupedReduce.java | 4 +-
.../api/operators/StreamingRuntimeContext.java | 38 ++++---
.../runtime/tasks/StreamTaskState.java | 36 +++++--
.../streaming/util/keys/KeySelectorUtil.java | 94 +++++++++++++----
.../api/ChainedRuntimeContextTest.java | 3 +-
.../util/keys/ArrayKeySelectorTest.java | 20 ++--
.../streaming/api/scala/ConnectedStreams.scala | 1 -
.../flink/streaming/api/scala/KeyedStream.scala | 9 ++
.../api/scala/function/StatefulFunction.scala | 2 +-
.../streaming/api/scala/DataStreamTest.scala | 2 +-
.../streaming/api/scala/StateTestPrograms.scala | 24 +++--
.../PartitionedStateCheckpointingITCase.java | 23 +++--
.../StreamCheckpointingITCase.java | 2 +-
23 files changed, 292 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
index 99de0e2..9c41d88 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -200,6 +200,7 @@ public class FlinkTopologyBuilder {
} else {
inputStream = inputStream
.keyBy(new SplitStreamTypeKeySelector(
+ inputStream.getType(),
prodDeclarer.getGroupingFieldIndexes(
inputStreamId,
grouping.get_fields())));
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
index 44c693c..71e5b86 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.storm.util;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -34,8 +35,8 @@ public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<T
private final ArrayKeySelector<Tuple> selector;
- public SplitStreamTypeKeySelector(int... fields) {
- this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+ public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... fields) {
+ this.selector = KeySelectorUtil.getSelectorForArray(fields, type);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
index 906d081..fa5c8d8 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
-import org.junit.Ignore;
import org.junit.Test;
import backtype.storm.tuple.Fields;
@@ -54,7 +53,6 @@ public class FlinkTopologyBuilderTest {
}
@Test
- @Ignore
public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
@@ -66,7 +64,6 @@ public class FlinkTopologyBuilderTest {
}
@Test
- @Ignore
public void testFieldsGroupingOnMultipleBoltOutputStreams() {
FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index cadef36..7f767c3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -167,6 +167,7 @@ public interface RuntimeContext {
* Gets the key/value state, which is only accessible if the function is executed on
* a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
* return the value bound to the key of the element currently processed by the function.
+ * Each operator may maintain multiple key/value states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element,
* and the elements are distributed by the Flink runtime, the system can transparently
@@ -200,8 +201,9 @@ public interface RuntimeContext {
* <p>This method attempts to deduce the type information from the given type class. If the
* full type cannot be determined from the class (for example because of generic parameters),
* the TypeInformation object must be manually passed via
- * {@link #getKeyValueState(TypeInformation, Object)}.
+ * {@link #getKeyValueState(String, TypeInformation, Object)}.
*
+ * @param name The name of the key/value state.
* @param stateType The class of the type that is stored in the state. Used to generate
* serializers for managed memory and checkpointing.
* @param defaultState The default state value, returned when the state is accessed and
@@ -213,12 +215,13 @@ public interface RuntimeContext {
* @throws UnsupportedOperationException Thrown, if no key/value state is available for the
* function (function is not part os a KeyedStream).
*/
- <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState);
+ <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
/**
* Gets the key/value state, which is only accessible if the function is executed on
* a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
* return the value bound to the key of the element currently processed by the function.
+ * Each operator may maintain multiple key/value states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element,
* and the elements are distributed by the Flink runtime, the system can transparently
@@ -249,6 +252,7 @@ public interface RuntimeContext {
*
* }</pre>
*
+ * @param name The name of the key/value state.
* @param stateType The type information for the type that is stored in the state.
* Used to create serializers for managed memory and checkpoints.
* @param defaultState The default state value, returned when the state is accessed and
@@ -260,5 +264,5 @@ public interface RuntimeContext {
* @throws UnsupportedOperationException Thrown, if no key/value state is available for the
* function (function is not part os a KeyedStream).
*/
- <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState);
+ <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 90d23cd..be8ac9d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -165,13 +165,13 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
}
@Override
- public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
@Override
- public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 44339ac..9bb444a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -125,6 +125,22 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
+
+ /**
+ * Gets the class that represents the component type.
+ * @return The class of the component type.
+ */
+ public Class<?> getComponentClass() {
+ return this.arrayClass.getComponentType();
+ }
+
+ /**
+ * Gets the type information of the component type.
+ * @return The type information of the component type.
+ */
+ public TypeInformation<?> getComponentType() {
+ return BasicTypeInfo.getInfoFor(getComponentClass());
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 9718b72..b9fc3de 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -121,12 +121,12 @@ public class MockRuntimeContext implements RuntimeContext {
}
@Override
- public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException();
}
@Override
- public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c15ea9b..176a07f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -253,7 +253,7 @@ public class DataStream<T> {
*/
public KeyedStream<T, Tuple> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+ return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
@@ -291,7 +291,7 @@ public class DataStream<T> {
*/
public DataStream<T> partitionByHash(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return partitionByHash(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+ return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ca86627..9e60e9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,6 +35,9 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Base class for all stream operators. Operators that contain a user function should extend the class
* {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
@@ -77,14 +80,19 @@ public abstract class AbstractStreamOperator<OUT>
/** The runtime context for UDFs */
private transient StreamingRuntimeContext runtimeContext;
+
// ---------------- key/value state ------------------
/** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
private transient KeySelector<?, ?> stateKeySelector;
- private transient KvState<?, ?, ?> keyValueState;
+ private transient KvState<?, ?, ?>[] keyValueStates;
+
+ private transient HashMap<String, KvState<?, ?, ?>> keyValueStatesByName;
- private transient KvStateSnapshot<?, ?, ?> keyValueStateSnapshot;
+ private transient TypeSerializer<?> keySerializer;
+
+ private transient HashMap<String, KvStateSnapshot<?, ?, ?>> keyValueStateSnapshots;
// ------------------------------------------------------------------------
// Life Cycle
@@ -133,8 +141,10 @@ public abstract class AbstractStreamOperator<OUT>
*/
@Override
public void dispose() {
- if (keyValueState != null) {
- keyValueState.dispose();
+ if (keyValueStates != null) {
+ for (KvState<?, ?, ?> state : keyValueStates) {
+ state.dispose();
+ }
}
}
@@ -147,9 +157,15 @@ public abstract class AbstractStreamOperator<OUT>
// here, we deal with key/value state snapshots
StreamTaskState state = new StreamTaskState();
- if (keyValueState != null) {
- KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
- state.setKvState(snapshot);
+ if (keyValueStates != null) {
+ HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
+
+ for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
+ KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().shapshot(checkpointId, timestamp);
+ snapshots.put(entry.getKey(), snapshot);
+ }
+
+ state.setKvStates(snapshots);
}
return state;
@@ -159,7 +175,7 @@ public abstract class AbstractStreamOperator<OUT>
public void restoreState(StreamTaskState state) throws Exception {
// restore the key/value state. the actual restore happens lazily, when the function requests
// the state again, because the restore method needs information provided by the user function
- keyValueStateSnapshot = state.getKvState();
+ keyValueStateSnapshots = state.getKvStates();
}
@Override
@@ -232,9 +248,9 @@ public abstract class AbstractStreamOperator<OUT>
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
protected <V> OperatorState<V> createKeyValueState(
- TypeInformation<V> stateType, V defaultValue) throws Exception
+ String name, TypeInformation<V> stateType, V defaultValue) throws Exception
{
- return createKeyValueState(stateType.createSerializer(getExecutionConfig()), defaultValue);
+ return createKeyValueState(name, stateType.createSerializer(getExecutionConfig()), defaultValue);
}
/**
@@ -253,12 +269,18 @@ public abstract class AbstractStreamOperator<OUT>
* @throws IllegalStateException Thrown, if the key/value state was already initialized.
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
+ @SuppressWarnings({"rawtypes", "unchecked"})
protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
- TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
+ String name, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
{
- if (keyValueState != null) {
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+ if (keyValueStatesByName != null && keyValueStatesByName.containsKey(name)) {
throw new IllegalStateException("The key/value state has already been created");
}
+
+ TypeSerializer<K> keySerializer;
// first time state access, make sure we load the state partitioner
if (stateKeySelector == null) {
@@ -267,46 +289,58 @@ public abstract class AbstractStreamOperator<OUT>
throw new UnsupportedOperationException("The function or operator is not executed " +
"on a KeyedStream and can hence not access the key/value state");
}
+
+ keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
+ if (keySerializer == null) {
+ throw new Exception("State key serializer has not been configured in the config.");
+ }
+ this.keySerializer = keySerializer;
}
-
- // create the key and value serializers
- TypeSerializer<K> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
- if (keySerializer == null) {
- throw new Exception("State key serializer has not been configured in the config.");
+ else if (this.keySerializer != null) {
+ keySerializer = (TypeSerializer<K>) this.keySerializer;
+ }
+ else {
+ // should never happen, this is merely a safeguard
+ throw new RuntimeException();
}
@SuppressWarnings("unchecked")
Backend stateBackend = (Backend) container.getStateBackend();
+
+ KvState<K, V, Backend> kvstate = null;
// check whether we restore the key/value state from a snapshot, or create a new blank one
- if (keyValueStateSnapshot != null) {
+ if (keyValueStateSnapshots != null) {
@SuppressWarnings("unchecked")
- KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshot;
+ KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshots.remove(name);
- KvState<K, V, Backend> kvstate = snapshot.restoreState(
- stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
- keyValueState = kvstate;
-
- // make sure we have no redundant copies in memory, let the GC clean up
- keyValueStateSnapshot = null;
-
- return kvstate;
+ if (snapshot != null) {
+ kvstate = snapshot.restoreState(
+ stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
+ }
}
- else {
+
+ if (kvstate == null) {
// create a new blank key/value state
- KvState<K, V, Backend> kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
- keyValueState = kvstate;
- return kvstate;
+ kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
}
+
+ if (keyValueStatesByName == null) {
+ keyValueStatesByName = new HashMap<>();
+ }
+ keyValueStatesByName.put(name, kvstate);
+ keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
+ return kvstate;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement(StreamRecord record) throws Exception {
- if (stateKeySelector != null && keyValueState != null) {
- KvState kv = keyValueState;
+ if (stateKeySelector != null && keyValueStates != null) {
KeySelector selector = stateKeySelector;
- kv.setCurrentKey(selector.getKey(record.getValue()));
+ for (KvState kv : keyValueStates) {
+ kv.setCurrentKey(selector.getKey(record.getValue()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 79e319a..cf6b489 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -38,6 +38,8 @@ public class StreamGroupedFold<IN, OUT, KEY>
implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
+
+ private static final String STATE_NAME = "_op_state";
// Grouped values
private transient OperatorState<OUT> values;
@@ -68,7 +70,7 @@ public class StreamGroupedFold<IN, OUT, KEY>
new DataInputStream(bais)
);
initialValue = outTypeSerializer.deserialize(in);
- values = createKeyValueState(outTypeSerializer, null);
+ values = createKeyValueState(STATE_NAME, outTypeSerializer, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index ebc4b09..ae15e92 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -27,6 +27,8 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
+
+ private static final String STATE_NAME = "_op_state";
private transient OperatorState<IN> values;
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
@Override
public void open() throws Exception {
super.open();
- values = createKeyValueState(serializer, null);
+ values = createKeyValueState(STATE_NAME, serializer, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index a51bb27..87a9abd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,10 +47,10 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
private final Environment taskEnvironment;
/** The key/value state, if the user-function requests it */
- private OperatorState<?> keyValueState;
+ private HashMap<String, OperatorState<?>> keyValueStates;
/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
- private TypeInformation<?> stateTypeInfo;
+ private HashMap<String, TypeInformation<?>> stateTypeInfos;
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
@@ -107,7 +108,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
// ------------------------------------------------------------------------
@Override
- public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
requireNonNull(stateType, "The state type class must not be null");
TypeInformation<S> typeInfo;
@@ -120,35 +121,48 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
"Please specify the TypeInformation directly.", e);
}
- return getKeyValueState(typeInfo, defaultState);
+ return getKeyValueState(name, typeInfo, defaultState);
}
@Override
- public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+ public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
+ requireNonNull(name, "The name of the state must not be null");
requireNonNull(stateType, "The state type information must not be null");
+ OperatorState<?> previousState;
+
// check if this is a repeated call to access the state
- if (this.stateTypeInfo != null && this.keyValueState != null) {
+ if (this.stateTypeInfos != null && this.keyValueStates != null &&
+ (previousState = this.keyValueStates.get(name)) != null) {
+
// repeated call
- if (this.stateTypeInfo.equals(stateType)) {
+ TypeInformation<?> previousType;
+ if (stateType.equals((previousType = this.stateTypeInfos.get(name)))) {
// valid case, same type requested again
@SuppressWarnings("unchecked")
- OperatorState<S> previous = (OperatorState<S>) this.keyValueState;
+ OperatorState<S> previous = (OperatorState<S>) previousState;
return previous;
}
else {
// invalid case, different type requested this time
throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
" ; The key/value state has already been created and initialized for a different type: " +
- this.stateTypeInfo);
+ previousType);
}
}
else {
// first time access to the key/value state
+ if (this.stateTypeInfos == null) {
+ this.stateTypeInfos = new HashMap<>();
+ }
+ if (this.keyValueStates == null) {
+ this.keyValueStates = new HashMap<>();
+ }
+
try {
- OperatorState<S> state = operator.createKeyValueState(stateType, defaultState);
- this.keyValueState = state;
- this.stateTypeInfo = stateType;
+ OperatorState<S> state = operator.createKeyValueState(name, stateType, defaultState);
+ this.keyValueStates.put(name, state);
+ this.stateTypeInfos.put(name, stateType);
return state;
}
catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 2fce7af..334fd44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -22,6 +22,9 @@ import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.state.KvStateSnapshot;
import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
/**
* The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
@@ -40,7 +43,7 @@ public class StreamTaskState implements Serializable {
private StateHandle<Serializable> functionState;
- private KvStateSnapshot<?, ?, ?> kvState;
+ private HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates;
// ------------------------------------------------------------------------
@@ -60,12 +63,12 @@ public class StreamTaskState implements Serializable {
this.functionState = functionState;
}
- public KvStateSnapshot<?, ?, ?> getKvState() {
- return kvState;
+ public HashMap<String, KvStateSnapshot<?, ?, ?>> getKvStates() {
+ return kvStates;
}
- public void setKvState(KvStateSnapshot<?, ?, ?> kvState) {
- this.kvState = kvState;
+ public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates) {
+ this.kvStates = kvStates;
}
// ------------------------------------------------------------------------
@@ -77,7 +80,7 @@ public class StreamTaskState implements Serializable {
* @return True, if all state is null, false if at least one state is not null.
*/
public boolean isEmpty() {
- return operatorState == null & functionState == null & kvState == null;
+ return operatorState == null & functionState == null & kvStates == null;
}
/**
@@ -89,7 +92,7 @@ public class StreamTaskState implements Serializable {
public void discardState() throws Exception {
StateHandle<?> operatorState = this.operatorState;
StateHandle<?> functionState = this.functionState;
- KvStateSnapshot<?, ?, ?> kvState = this.kvState;
+ HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates = this.kvStates;
if (operatorState != null) {
operatorState.discardState();
@@ -97,12 +100,25 @@ public class StreamTaskState implements Serializable {
if (functionState != null) {
functionState.discardState();
}
- if (kvState != null) {
- kvState.discardState();
+ if (kvStates != null) {
+ while (kvStates.size() > 0) {
+ try {
+ Iterator<KvStateSnapshot<?, ?, ?>> values = kvStates.values().iterator();
+ while (values.hasNext()) {
+ KvStateSnapshot<?, ?, ?> s = values.next();
+ s.discardState();
+ values.remove();
+ }
+ }
+ catch (ConcurrentModificationException e) {
+ // fall through the loop
+ }
+ }
}
this.operatorState = null;
this.functionState = null;
- this.kvState = null;
+ this.kvStates = null;
}
}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 9c76d95..afbd8ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -18,10 +18,13 @@
package org.apache.flink.streaming.util.keys;
import java.lang.reflect.Array;
+import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -31,6 +34,8 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import static java.util.Objects.requireNonNull;
+
/**
* Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
*/
@@ -49,7 +54,7 @@ public final class KeySelectorUtil {
// use ascending order here, the code paths for that are usually a slight bit faster
boolean[] orders = new boolean[numKeyFields];
- TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
+ TypeInformation<?>[] typeInfos = new TypeInformation<?>[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
orders[i] = true;
typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
@@ -59,31 +64,71 @@ public final class KeySelectorUtil {
return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
}
+ public static <X> ArrayKeySelector<X> getSelectorForArray(int[] positions, TypeInformation<X> typeInfo) {
+ if (positions == null || positions.length == 0 || positions.length > Tuple.MAX_ARITY) {
+ throw new IllegalArgumentException("Array keys must have between 1 and " + Tuple.MAX_ARITY + " fields.");
+ }
+
+ TypeInformation<?> componentType;
+
+ if (typeInfo instanceof BasicArrayTypeInfo) {
+ BasicArrayTypeInfo<X, ?> arrayInfo = (BasicArrayTypeInfo<X, ?>) typeInfo;
+ componentType = arrayInfo.getComponentInfo();
+ }
+ else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
+ PrimitiveArrayTypeInfo<X> arrayType = (PrimitiveArrayTypeInfo<X>) typeInfo;
+ componentType = arrayType.getComponentType();
+ }
+ else {
+ throw new IllegalArgumentException("This method only supports arrays of primitives and boxed primitives.");
+ }
+
+ TypeInformation<?>[] primitiveInfos = new TypeInformation<?>[positions.length];
+ Arrays.fill(primitiveInfos, componentType);
+
+ return new ArrayKeySelector<>(positions, new TupleTypeInfo<>(primitiveInfos));
+ }
+
- public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo,
- ExecutionConfig executionConfig) {
+ public static <X, K> KeySelector<X, K> getSelectorForOneKey(
+ Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
+ {
+ if (!(typeInfo instanceof CompositeType)) {
+ throw new InvalidTypesException(
+ "This key operation requires a composite type such as Tuples, POJOs, case classes, etc");
+ }
if (partitioner != null) {
keys.validateCustomPartitioner(partitioner, null);
}
+ CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-
if (logicalKeyPositions.length != 1) {
throw new IllegalArgumentException("There must be exactly 1 key specified");
}
-
- TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
- logicalKeyPositions, new boolean[1], 0, executionConfig);
+
+ TypeComparator<X> comparator = compositeType.createComparator(
+ logicalKeyPositions, new boolean[] { true }, 0, executionConfig);
return new OneKeySelector<>(comparator);
}
+ // ------------------------------------------------------------------------
+
/**
* Private constructor to prevent instantiation.
*/
private KeySelectorUtil() {
throw new RuntimeException();
}
-
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Key extractor that extracts a single field via a generic comparator.
+ *
+ * @param <IN> The type of the elements where the key is extracted from.
+ * @param <K> The type of the key.
+ */
public static final class OneKeySelector<IN, K> implements KeySelector<IN, K> {
private static final long serialVersionUID = 1L;
@@ -94,8 +139,8 @@ public final class KeySelectorUtil {
* are null), it does not have any serialization problems */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Object[] keyArray;
-
- public OneKeySelector(TypeComparator<IN> comparator) {
+
+ OneKeySelector(TypeComparator<IN> comparator) {
this.comparator = comparator;
this.keyArray = new Object[1];
}
@@ -121,18 +166,18 @@ public final class KeySelectorUtil {
private final TypeComparator<IN> comparator;
private final int keyLength;
- private final TupleTypeInfo tupleTypeInfo;
+ private transient TupleTypeInfo<Tuple> tupleTypeInfo;
/** Reusable array to hold the key objects. Since this is initially empty (all positions
* are null), it does not have any serialization problems */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Object[] keyArray;
- public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
+ ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo<Tuple> tupleTypeInfo) {
this.comparator = comparator;
this.keyLength = keyLength;
this.tupleTypeInfo = tupleTypeInfo;
- keyArray = new Object[keyLength];
+ this.keyArray = new Object[keyLength];
}
@Override
@@ -147,6 +192,9 @@ public final class KeySelectorUtil {
@Override
public TypeInformation<Tuple> getProducedType() {
+ if (tupleTypeInfo == null) {
+ throw new IllegalStateException("The return type information is not available after serialization");
+ }
return tupleTypeInfo;
}
}
@@ -158,23 +206,35 @@ public final class KeySelectorUtil {
*
* @param <IN> The type from which the key is extracted, i.e., the array type.
*/
- public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
+ public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
private static final long serialVersionUID = 1L;
private final int[] fields;
+ private final Class<? extends Tuple> tupleClass;
+ private transient TupleTypeInfo<Tuple> returnType;
- public ArrayKeySelector(int... fields) {
- this.fields = fields;
+ ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) {
+ this.fields = requireNonNull(fields);
+ this.returnType = requireNonNull(returnType);
+ this.tupleClass = Tuple.getTupleClass(fields.length);
}
@Override
public Tuple getKey(IN value) throws Exception {
- Tuple key = Tuple.getTupleClass(fields.length).newInstance();
+ Tuple key = tupleClass.newInstance();
for (int i = 0; i < fields.length; i++) {
key.setField(Array.get(value, fields[i]), i);
}
return key;
}
+
+ @Override
+ public TypeInformation<Tuple> getProducedType() {
+ if (returnType == null) {
+ throw new IllegalStateException("The return type information is not available after serialization");
+ }
+ return returnType;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index fdf7697..68a047c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -25,9 +25,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
import org.junit.Test;
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
index 9d807cf..63375a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.util.keys;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.Test;
@@ -29,15 +31,17 @@ public class ArrayKeySelectorTest {
@Test
public void testObjectArrays() {
try {
- Object[] array1 = { "a", "b", "c", "d", "e" };
- Object[] array2 = { "v", "w", "x", "y", "z" };
+ String[] array1 = { "a", "b", "c", "d", "e" };
+ String[] array2 = { "v", "w", "x", "y", "z" };
- KeySelectorUtil.ArrayKeySelector<Object[]> singleFieldSelector = new KeySelectorUtil.ArrayKeySelector<>(1);
+ KeySelectorUtil.ArrayKeySelector<String[]> singleFieldSelector =
+ KeySelectorUtil.getSelectorForArray(new int[] {1}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
assertEquals(new Tuple1<>("b"), singleFieldSelector.getKey(array1));
assertEquals(new Tuple1<>("w"), singleFieldSelector.getKey(array2));
- KeySelectorUtil.ArrayKeySelector<Object[]> twoFieldsSelector = new KeySelectorUtil.ArrayKeySelector<>(3, 0);
+ KeySelectorUtil.ArrayKeySelector<String[]> twoFieldsSelector =
+ KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
assertEquals(new Tuple2<>("d", "a"), twoFieldsSelector.getKey(array1));
assertEquals(new Tuple2<>("y", "v"), twoFieldsSelector.getKey(array2));
@@ -55,13 +59,15 @@ public class ArrayKeySelectorTest {
int[] array1 = { 1, 2, 3, 4, 5 };
int[] array2 = { -5, -4, -3, -2, -1, 0 };
- KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector = new KeySelectorUtil.ArrayKeySelector<>(1);
+ KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector =
+ KeySelectorUtil.getSelectorForArray(new int[] {1}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
assertEquals(new Tuple1<>(2), singleFieldSelector.getKey(array1));
assertEquals(new Tuple1<>(-4), singleFieldSelector.getKey(array2));
- KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector = new KeySelectorUtil.ArrayKeySelector<>(3, 0);
-
+ KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector =
+ KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
assertEquals(new Tuple2<>(4, 1), twoFieldsSelector.getKey(array1));
assertEquals(new Tuple2<>(-2, -5), twoFieldsSelector.getKey(array2));
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index f7413b7..3ff773f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -362,4 +362,3 @@ class KeySelectorWithType[IN, K](
override def getProducedType: TypeInformation[K] = info
}
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 84354a3..9f5c069 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -36,6 +36,15 @@ import scala.reflect.ClassTag
class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
// ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the type of the key by which this stream is keyed.
+ */
+ def getKeyType = javaStream.getKeyType()
+
+ // ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 5a591a8..d66cfdb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -43,6 +43,6 @@ trait StatefulFunction[I, O, S] extends RichFunction {
}
override def open(c: Configuration) = {
- state = getRuntimeContext().getKeyValueState[S](stateType, null.asInstanceOf[S])
+ state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index fe85fd1..988e7ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -118,7 +118,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val partition1: DataStream[_] = src1.partitionByHash(0)
val partition2: DataStream[_] = src1.partitionByHash(1, 0)
val partition3: DataStream[_] = src1.partitionByHash("_1")
- val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1);
+ val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1)
val pid1 = createDownStreamId(partition1)
val pid2 = createDownStreamId(partition2)
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index 7904bcb..b2e05b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -31,11 +31,12 @@ object StateTestPrograms {
// test stateful map
env.generateSequence(0, 10).setParallelism(1)
- .keyBy(x => x)
+ .map { v => (1, v) }.setParallelism(1)
+ .keyBy(_._1)
.mapWithState((in, count: Option[Long]) =>
count match {
- case Some(c) => (in - c, Some(c + 1))
- case None => (in, Some(1L))
+ case Some(c) => (in._2 - c, Some(c + 1))
+ case None => (in._2, Some(1L))
}).setParallelism(1)
.addSink(new RichSinkFunction[Long]() {
@@ -49,12 +50,12 @@ object StateTestPrograms {
})
// test stateful flatmap
- env.fromElements("Fir st-", "Hello world")
- .keyBy(x => x)
+ env.fromElements((1, "First"), (2, "Second"), (1, "Hello world"))
+ .keyBy(_._1)
.flatMapWithState((w, s: Option[String]) =>
s match {
- case Some(state) => (w.split(" ").toList.map(state + _), Some(w))
- case None => (List(w), Some(w))
+ case Some(state) => (w._2.split(" ").toList.map(state + _), Some(w._2))
+ case None => (List(w._2), Some(w._2))
})
.setParallelism(1)
@@ -62,10 +63,11 @@ object StateTestPrograms {
val received = new util.HashSet[String]()
override def invoke(in: String) = { received.add(in) }
override def close() = {
- assert(received.size() == 3)
- assert(received.contains("Fir st-"))
- assert(received.contains("Fir st-Hello"))
- assert(received.contains("Fir st-world"))
+ assert(received.size() == 4)
+ assert(received.contains("First"))
+ assert(received.contains("Second"))
+ assert(received.contains("FirstHello"))
+ assert(received.contains("Firstworld"))
}
}).setParallelism(1)
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 0fcedda..67c0189 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -107,7 +107,9 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
@Override
public void open(Configuration parameters) throws IOException {
step = getRuntimeContext().getNumberOfParallelSubtasks();
- index = getRuntimeContext().getIndexOfThisSubtask();
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
}
@Override
@@ -165,7 +167,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
- sum = getRuntimeContext().getKeyValueState(Long.class, 0L);
+ sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L);
}
@Override
@@ -187,17 +189,26 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
- private OperatorState<NonSerializableLong> counts;
+ private OperatorState<NonSerializableLong> aCounts;
+ private OperatorState<Long> bCounts;
@Override
public void open(Configuration parameters) throws IOException {
- counts = getRuntimeContext().getKeyValueState(NonSerializableLong.class, NonSerializableLong.of(0L));
+ aCounts = getRuntimeContext().getKeyValueState(
+ "a", NonSerializableLong.class, NonSerializableLong.of(0L));
+ bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L);
}
@Override
public void invoke(Tuple2<Integer, Long> value) throws Exception {
- long currentCount = counts.value().value + 1;
- counts.update(NonSerializableLong.of(currentCount));
+ long ac = aCounts.value().value;
+ long bc = bCounts.value();
+ assertEquals(ac, bc);
+
+ long currentCount = ac + 1;
+ aCounts.update(NonSerializableLong.of(currentCount));
+ bCounts.update(currentCount);
+
allCounts.put(value.f0, currentCount);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 992a679..e98696e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -252,7 +252,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
- pCount = getRuntimeContext().getKeyValueState(Long.class, 0L);
+ pCount = getRuntimeContext().getKeyValueState("pCount", Long.class, 0L);
}
@Override