You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 12:42:26 UTC
[2/2] flink git commit: [FLINK-9571] Repace StateBinder with internal
backend-specific state factories
[FLINK-9571] Repace StateBinder with internal backend-specific state factories
This closes #6173.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bdde837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bdde837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bdde837
Branch: refs/heads/master
Commit: 0bdde8377c254195fe94709d639bf03f9bd77606
Parents: 0e9b066
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Wed Jun 13 10:24:10 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 14:41:50 2018 +0200
----------------------------------------------------------------------
.../state/AggregatingStateDescriptor.java | 7 -
.../common/state/FoldingStateDescriptor.java | 7 -
.../api/common/state/ListStateDescriptor.java | 7 -
.../api/common/state/MapStateDescriptor.java | 5 -
.../common/state/ReducingStateDescriptor.java | 7 -
.../flink/api/common/state/StateBinder.java | 85 ----------
.../flink/api/common/state/StateDescriptor.java | 10 +-
.../api/common/state/ValueStateDescriptor.java | 7 -
.../api/common/state/StateDescriptorTest.java | 10 --
.../client/QueryableStateClient.java | 57 ++++++-
.../client/state/ImmutableAggregatingState.java | 22 +--
.../client/state/ImmutableFoldingState.java | 18 +-
.../client/state/ImmutableListState.java | 28 ++--
.../client/state/ImmutableMapState.java | 31 ++--
.../client/state/ImmutableReducingState.java | 18 +-
.../client/state/ImmutableStateBinder.java | 80 ---------
.../client/state/ImmutableValueState.java | 18 +-
.../state/ImmutableAggregatingStateTest.java | 7 +-
.../client/state/ImmutableFoldingStateTest.java | 7 +-
.../client/state/ImmutableListStateTest.java | 9 +-
.../client/state/ImmutableMapStateTest.java | 19 ++-
.../state/ImmutableReducingStateTest.java | 7 +-
.../client/state/ImmutableValueStateTest.java | 8 +-
.../KVStateRequestSerializerRocksDBTest.java | 50 +-----
.../network/KvStateRequestSerializerTest.java | 6 +-
.../state/AbstractKeyedStateBackend.java | 164 +++----------------
.../state/heap/HeapAggregatingState.java | 24 ++-
.../runtime/state/heap/HeapFoldingState.java | 20 ++-
.../state/heap/HeapKeyedStateBackend.java | 125 ++++----------
.../flink/runtime/state/heap/HeapListState.java | 18 +-
.../flink/runtime/state/heap/HeapMapState.java | 21 ++-
.../runtime/state/heap/HeapReducingState.java | 31 +++-
.../runtime/state/heap/HeapValueState.java | 17 +-
.../runtime/state/StateBackendTestBase.java | 8 +-
.../state/StateSnapshotCompressionTest.java | 6 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 6 +-
.../state/RocksDBAggregatingState.java | 27 ++-
.../streaming/state/RocksDBFoldingState.java | 27 ++-
.../state/RocksDBKeyedStateBackend.java | 132 ++++-----------
.../streaming/state/RocksDBListState.java | 35 +++-
.../streaming/state/RocksDBMapState.java | 75 ++++-----
.../streaming/state/RocksDBReducingState.java | 27 ++-
.../streaming/state/RocksDBValueState.java | 27 ++-
.../RocksDBWriteBatchPerformanceTest.java | 4 +-
44 files changed, 510 insertions(+), 814 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 8c7fed6..1197ed2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -93,13 +93,6 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
this.aggFunction = checkNotNull(aggFunction);
}
- // ------------------------------------------------------------------------
-
- @Override
- public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createAggregatingState(this);
- }
-
/**
* Returns the aggregate function to be used for the state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index c14e4bf..392c04c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,13 +97,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
}
}
- // ------------------------------------------------------------------------
-
- @Override
- public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createFoldingState(this);
- }
-
/**
* Returns the fold function to be used for the folding state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index aa5e64b..0016c22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -76,13 +76,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
super(name, new ListSerializer<>(typeSerializer), null);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ListState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createListState(this);
- }
-
/**
* Gets the serializer for the elements contained in the list.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 42b016a..6eb8ddc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -81,11 +81,6 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
}
@Override
- public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createMapState(this);
- }
-
- @Override
public Type getType() {
return Type.MAP;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 0df1c2c..07b22c9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -83,13 +83,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
this.reduceFunction = checkNotNull(reduceFunction);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createReducingState(this);
- }
-
/**
* Returns the reduce function to be used for the reducing state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
deleted file mode 100644
index 871b4a8..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
- * {@link State} objects.
- */
-@Internal
-public interface StateBinder {
-
- /**
- * Creates and returns a new {@link ValueState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the value that the {@code ValueState} can store.
- */
- <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ListState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ReducingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> The type of the values that the {@code ReducingState} can store.
- */
- <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link AggregatingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <IN> The type of the values that go into the aggregating state
- * @param <ACC> The type of the values that are stored in the aggregating state
- * @param <OUT> The type of the values that come out of the aggregating state
- */
- <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
- AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link FoldingState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <T> Type of the values folded into the state
- * @param <ACC> Type of the value in the state
- *
- * @deprecated will be removed in a future version in favor of {@link AggregatingState}
- */
- @Deprecated
- <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link MapState}.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <MK> Type of the keys in the state
- * @param <MV> Type of the values in the state
- */
- <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 9b6b51d..6c54e71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -41,8 +41,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
- * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
+ * {@link State} in stateful operations.
*
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
@@ -231,13 +230,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
return queryableStateName != null;
}
- /**
- * Creates a new {@link State} on the given {@link StateBinder}.
- *
- * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
- */
- public abstract S bind(StateBinder stateBinder) throws Exception;
-
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 4d69d81..d2719aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -122,13 +122,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
super(name, typeSerializer, null);
}
- // ------------------------------------------------------------------------
-
- @Override
- public ValueState<T> bind(StateBinder stateBinder) throws Exception {
- return stateBinder.createValueState(this);
- }
-
@Override
public Type getType() {
return Type.VALUE;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 3958baa..4346163 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -221,11 +221,6 @@ public class StateDescriptorTest {
}
@Override
- public State bind(StateBinder stateBinder) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Type getType() {
return Type.VALUE;
}
@@ -248,11 +243,6 @@ public class StateDescriptorTest {
}
@Override
- public State bind(StateBinder stateBinder) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Type getType() {
return Type.VALUE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 2a6baf0..470c7ac 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,13 +21,25 @@ package org.apache.flink.queryablestate.client;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.FutureUtils;
-import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
@@ -44,7 +56,10 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Client for querying Flink's managed state.
@@ -67,6 +82,20 @@ public class QueryableStateClient {
private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (StateFactory) ImmutableValueState::createState),
+ Tuple2.of(ListStateDescriptor.class, (StateFactory) ImmutableListState::createState),
+ Tuple2.of(MapStateDescriptor.class, (StateFactory) ImmutableMapState::createState),
+ Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) ImmutableAggregatingState::createState),
+ Tuple2.of(ReducingStateDescriptor.class, (StateFactory) ImmutableReducingState::createState),
+ Tuple2.of(FoldingStateDescriptor.class, (StateFactory) ImmutableFoldingState::createState)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private interface StateFactory {
+ <T, S extends State> S createState(StateDescriptor<S, T> stateDesc, byte[] serializedState) throws Exception;
+ }
+
/** The client that forwards the requests to the proxy. */
private final Client<KvStateRequest, KvStateResponse> client;
@@ -241,14 +270,24 @@ public class QueryableStateClient {
return FutureUtils.getFailedFuture(e);
}
- return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
- stateResponse -> {
- try {
- return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
- } catch (Exception e) {
- throw new FlinkRuntimeException(e);
- }
- });
+ return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+ .thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
+ }
+
+ private <T, S extends State> S createState(
+ KvStateResponse stateResponse,
+ StateDescriptor<S, T> stateDescriptor) {
+ StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDescriptor.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ try {
+ return stateFactory.createState(stateDescriptor, stateResponse.getContent());
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
index 8964fbf..a83da54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link AggregatingStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
private final OUT value;
@@ -57,15 +57,15 @@ public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState imp
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
- final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
- final byte[] serializedValue) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <OUT, ACC, S extends State> S createState(
+ StateDescriptor<S, ACC> stateDescriptor,
+ byte[] serializedState) throws IOException {
final ACC accumulator = KvStateSerializer.deserializeValue(
- serializedValue,
- stateDescriptor.getSerializer());
-
- final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
- return new ImmutableAggregatingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ final OUT state = ((AggregatingStateDescriptor<?, ACC, OUT>) stateDescriptor).
+ getAggregateFunction().getResult(accumulator);
+ return (S) new ImmutableAggregatingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
index 25f3118..16a94c6 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link FoldingStateDescriptor}.
*/
-@PublicEvolving
@Deprecated
public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
@@ -58,13 +58,13 @@ public final class ImmutableFoldingState<IN, ACC> extends ImmutableState impleme
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
- final FoldingStateDescriptor<IN, ACC> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <ACC, S extends State> S createState(
+ StateDescriptor<S, ACC> stateDescriptor,
+ byte[] serializedState) throws IOException {
final ACC state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableFoldingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableFoldingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465e..0c86ecf 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -34,7 +35,6 @@ import java.util.List;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ListStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
private final List<V> listState;
@@ -58,23 +58,23 @@ public final class ImmutableListState<V> extends ImmutableState implements ListS
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableListState<V> createState(
- final ListStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
- final List<V> state = KvStateSerializer.deserializeList(
- serializedState,
- stateDescriptor.getElementSerializer());
- return new ImmutableListState<>(state);
- }
-
@Override
- public void update(List<V> values) throws Exception {
+ public void update(List<V> values) {
throw MODIFICATION_ATTEMPT_ERROR;
}
@Override
- public void addAll(List<V> values) throws Exception {
+ public void addAll(List<V> values) {
throw MODIFICATION_ATTEMPT_ERROR;
}
+
+ @SuppressWarnings("unchecked")
+ public static <V, T, S extends State> S createState(
+ StateDescriptor<S, T> stateDescriptor,
+ byte[] serializedState) throws IOException {
+ final List<V> state = KvStateSerializer.deserializeList(
+ serializedState,
+ ((ListStateDescriptor<V>) stateDescriptor).getElementSerializer());
+ return (S) new ImmutableListState<>(state);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index bb08cf0..4d51b7d 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -38,7 +39,6 @@ import java.util.Set;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link MapStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
private final Map<K, V> state;
@@ -76,8 +76,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
*
* @return A read-only iterable view of all the key-value pairs in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<Map.Entry<K, V>> entries() {
@@ -88,8 +86,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
*
* @return A read-only iterable view of all the keys in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<K> keys() {
@@ -100,8 +96,6 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
*
* @return A read-only iterable view of all the values in the state.
- *
- * @throws Exception Thrown if the system cannot access the state.
*/
@Override
public Iterable<V> values() {
@@ -112,9 +106,7 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
* Iterates over all the mappings in the state. The iterator cannot
* remove elements.
*
- * @return A read-only iterator over all the mappings in the state
- *
- * @throws Exception Thrown if the system cannot access the state.
+ * @return A read-only iterator over all the mappings in the state.
*/
@Override
public Iterator<Map.Entry<K, V>> iterator() {
@@ -126,14 +118,15 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <K, V> ImmutableMapState<K, V> createState(
- final MapStateDescriptor<K, V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <K, V, T, S extends State> S createState(
+ StateDescriptor<S, T> stateDescriptor,
+ byte[] serializedState) throws IOException {
+ MapStateDescriptor<K, V> mapStateDescriptor = (MapStateDescriptor<K, V>) stateDescriptor;
final Map<K, V> state = KvStateSerializer.deserializeMap(
- serializedState,
- stateDescriptor.getKeySerializer(),
- stateDescriptor.getValueSerializer());
- return new ImmutableMapState<>(state);
+ serializedState,
+ mapStateDescriptor.getKeySerializer(),
+ mapStateDescriptor.getValueSerializer());
+ return (S) new ImmutableMapState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
index 46b477f..a9990b0 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -18,9 +18,10 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ReducingStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableReducingState<V> extends ImmutableState implements R
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableReducingState<V> createState(
- final ReducingStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <V, S extends State> S createState(
+ StateDescriptor<S, V> stateDescriptor,
+ byte[] serializedState) throws IOException {
final V state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableReducingState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableReducingState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
deleted file mode 100644
index 6ce2787..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state;
-
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateBinder;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link StateBinder} used to deserialize the results returned by the
- * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
- *
- * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
- * object containing the requested result.
- */
-public class ImmutableStateBinder implements StateBinder {
-
- private final byte[] serializedState;
-
- public ImmutableStateBinder(final byte[] content) {
- serializedState = Preconditions.checkNotNull(content);
- }
-
- @Override
- public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableValueState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableListState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
- return ImmutableReducingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
- return ImmutableAggregatingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- return ImmutableFoldingState.createState(stateDesc, serializedState);
- }
-
- @Override
- public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
- return ImmutableMapState.createState(stateDesc, serializedState);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
index f3ddd2b..3a5cab4 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -18,7 +18,8 @@
package org.apache.flink.queryablestate.client.state;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -33,7 +34,6 @@ import java.io.IOException;
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ValueStateDescriptor}.
*/
-@PublicEvolving
public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
private final V value;
@@ -57,13 +57,13 @@ public final class ImmutableValueState<V> extends ImmutableState implements Valu
throw MODIFICATION_ATTEMPT_ERROR;
}
- public static <V> ImmutableValueState<V> createState(
- final ValueStateDescriptor<V> stateDescriptor,
- final byte[] serializedState) throws IOException {
-
+ @SuppressWarnings("unchecked")
+ public static <V, S extends State> S createState(
+ StateDescriptor<S, V> stateDescriptor,
+ byte[] serializedState) throws IOException {
final V state = KvStateSerializer.deserializeValue(
- serializedState,
- stateDescriptor.getSerializer());
- return new ImmutableValueState<>(state);
+ serializedState,
+ stateDescriptor.getSerializer());
+ return (S) new ImmutableValueState<>(state);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
index ebbc896..955bf38 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -41,7 +42,7 @@ public class ImmutableAggregatingStateTest {
new SumAggr(),
String.class);
- private ImmutableAggregatingState<Long, String> aggrState;
+ private AggregatingState<Long, String> aggrState;
@Before
public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableAggregatingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
String value = aggrState.get();
assertEquals("42", value);
@@ -69,7 +70,7 @@ public class ImmutableAggregatingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
String value = aggrState.get();
assertEquals("42", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
index 9e8dfc9..2803d23 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -43,7 +44,7 @@ public class ImmutableFoldingStateTest {
new SumFold(),
StringSerializer.INSTANCE);
- private ImmutableFoldingState<Long, String> foldingState;
+ private FoldingState<Long, String> foldingState;
@Before
public void setUp() throws Exception {
@@ -61,7 +62,7 @@ public class ImmutableFoldingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
String value = foldingState.get();
assertEquals("42", value);
@@ -69,7 +70,7 @@ public class ImmutableFoldingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
String value = foldingState.get();
assertEquals("42", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
index a78ed1f..19b8514 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,7 +43,7 @@ public class ImmutableListStateTest {
private final ListStateDescriptor<Long> listStateDesc =
new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableListState<Long> listState;
+ private ListState<Long> listState;
@Before
public void setUp() throws Exception {
@@ -58,7 +59,7 @@ public class ImmutableListStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
List<Long> list = getStateContents();
assertEquals(1L, list.size());
@@ -69,7 +70,7 @@ public class ImmutableListStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
List<Long> list = getStateContents();
assertEquals(1L, list.size());
@@ -100,7 +101,7 @@ public class ImmutableListStateTest {
return baos.toByteArray();
}
- private List<Long> getStateContents() {
+ private List<Long> getStateContents() throws Exception {
List<Long> list = new ArrayList<>();
for (Long elem: listState.get()) {
list.add(elem);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index ffeabae..6465257 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -44,7 +45,7 @@ public class ImmutableMapStateTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableMapState<Long, Long> mapState;
+ private MapState<Long, Long> mapState;
@Before
public void setUp() throws Exception {
@@ -65,7 +66,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testPut() {
+ public void testPut() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -78,7 +79,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testPutAll() {
+ public void testPutAll() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -95,7 +96,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -108,7 +109,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testIterator() {
+ public void testIterator() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -124,7 +125,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testIterable() {
+ public void testIterable() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -142,7 +143,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testKeys() {
+ public void testKeys() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -158,7 +159,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testValues() {
+ public void testValues() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
@@ -174,7 +175,7 @@ public class ImmutableMapStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
assertTrue(mapState.contains(1L));
long value = mapState.get(1L);
assertEquals(5L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
index 9694f55..543f714 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -38,7 +39,7 @@ public class ImmutableReducingStateTest {
private final ReducingStateDescriptor<Long> reducingStateDesc =
new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableReducingState<Long> reduceState;
+ private ReducingState<Long> reduceState;
@Before
public void setUp() throws Exception {
@@ -53,7 +54,7 @@ public class ImmutableReducingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws Exception {
long value = reduceState.get();
assertEquals(42L, value);
@@ -61,7 +62,7 @@ public class ImmutableReducingStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws Exception {
long value = reduceState.get();
assertEquals(42L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
index a0da43d..75f8f03 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java
@@ -19,12 +19,14 @@
package org.apache.flink.queryablestate.client.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
@@ -37,7 +39,7 @@ public class ImmutableValueStateTest {
private final ValueStateDescriptor<Long> valueStateDesc =
new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
- private ImmutableValueState<Long> valueState;
+ private ValueState<Long> valueState;
@Before
public void setUp() throws Exception {
@@ -52,7 +54,7 @@ public class ImmutableValueStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
+ public void testUpdate() throws IOException {
long value = valueState.value();
assertEquals(42L, value);
@@ -60,7 +62,7 @@ public class ImmutableValueStateTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testClear() {
+ public void testClear() throws IOException {
long value = valueState.value();
assertEquals(42L, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 6ee7631..a49fdd2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.queryablestate.network;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
@@ -40,8 +39,6 @@ import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import java.io.File;
-
import static org.mockito.Mockito.mock;
/**
@@ -54,43 +51,6 @@ public final class KVStateRequestSerializerRocksDBTest {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * Extension of {@link RocksDBKeyedStateBackend} to make {@link
- * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
- * the tests.
- *
- * @param <K> key type
- */
- static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
- RocksDBKeyedStateBackend2(
- final String operatorIdentifier,
- final ClassLoader userCodeClassLoader,
- final File instanceBasePath,
- final DBOptions dbOptions,
- final ColumnFamilyOptions columnFamilyOptions,
- final TaskKvStateRegistry kvStateRegistry,
- final TypeSerializer<K> keySerializer,
- final int numberOfKeyGroups,
- final KeyGroupRange keyGroupRange,
- final ExecutionConfig executionConfig) throws Exception {
-
- super(operatorIdentifier, userCodeClassLoader,
- instanceBasePath,
- dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
- numberOfKeyGroups, keyGroupRange, executionConfig, false,
- TestLocalRecoveryConfig.disabled());
- }
-
- @Override
- public <N, T> InternalListState<K, N, T> createListState(
- final TypeSerializer<N> namespaceSerializer,
- final ListStateDescriptor<T> stateDesc) throws Exception {
-
- return super.createListState(namespaceSerializer, stateDesc);
- }
- }
-
- /**
* Tests list serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testListSerialization()
@@ -105,8 +65,8 @@ public final class KVStateRequestSerializerRocksDBTest {
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
- final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
- new RocksDBKeyedStateBackend2<>(
+ final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
@@ -115,13 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0),
- new ExecutionConfig()
+ new ExecutionConfig(), false,
+ TestLocalRecoveryConfig.disabled()
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
- .createListState(VoidNamespaceSerializer.INSTANCE,
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
KvStateRequestSerializerTest.testListSerialization(key, listState);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 1dc7186..2ba7507 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,9 +200,9 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
- VoidNamespaceSerializer.INSTANCE,
- new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
testListSerialization(key, listState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index f873655..1690240 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -20,32 +20,13 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBinder;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -73,30 +54,30 @@ public abstract class AbstractKeyedStateBackend<K> implements
protected final TypeSerializer<K> keySerializer;
/** The currently active key. */
- protected K currentKey;
+ private K currentKey;
- /** The key group of the currently active key */
+ /** The key group of the currently active key. */
private int currentKeyGroup;
/** So that we can give out state when the user uses the same key. */
- protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+ private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
- /** For caching the last accessed partitioned state */
+ /** For caching the last accessed partitioned state. */
private String lastName;
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
- /** The number of key-groups aka max parallelism */
+ /** The number of key-groups aka max parallelism. */
protected final int numberOfKeyGroups;
- /** Range of key-groups for which this backend is responsible */
+ /** Range of key-groups for which this backend is responsible. */
protected final KeyGroupRange keyGroupRange;
- /** KvStateRegistry helper for this task */
+ /** KvStateRegistry helper for this task. */
protected final TaskKvStateRegistry kvStateRegistry;
- /** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+ /** Registry for all opened streams, so they can be closed if the task using this backend is closed. */
protected CloseableRegistry cancelStreamRegistry;
protected final ClassLoader userCodeClassLoader;
@@ -153,87 +134,19 @@ public abstract class AbstractKeyedStateBackend<K> implements
}
/**
- * Creates and returns a new {@link ValueState}.
+ * Creates and returns a new {@link State}.
*
* @param namespaceSerializer TypeSerializer for the state namespace.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
*
* @param <N> The type of the namespace.
- * @param <T> The type of the value that the {@code ValueState} can store.
+ * @param <SV> The type of the stored state value.
+ * @param <S> The type of the public API state.
+ * @param <IS> The type of internal state.
*/
- protected abstract <N, T> InternalValueState<K, N, T> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ListState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link ReducingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link AggregatingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that the {@code ListState} can store.
- */
- protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link FoldingState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <T> Type of the values folded into the state
- * @param <ACC> Type of the value in the state
- *
- * @deprecated will be removed in a future version
- */
- @Deprecated
- protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
- /**
- * Creates and returns a new {@link MapState}.
- *
- * @param namespaceSerializer TypeSerializer for the state namespace.
- * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
- *
- * @param <N> The type of the namespace.
- * @param <UK> Type of the keys in the state
- * @param <UV> Type of the values in the state *
- */
- protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+ public abstract <N, SV, S extends State, IS extends S> IS createState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception;
/**
* @see KeyedStateBackend
@@ -311,8 +224,6 @@ public abstract class AbstractKeyedStateBackend<K> implements
throw new RuntimeException(e);
}
});
- } catch (RuntimeException e) {
- throw e;
}
}
@@ -320,6 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @see KeyedStateBackend
*/
@Override
+ @SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
@@ -343,43 +255,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
return typedState;
}
- // create a new blank key/value state
- S state = stateDescriptor.bind(new StateBinder() {
- @Override
- public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
- }
-
- @Override
- public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
- return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
- }
-
- });
-
- @SuppressWarnings("unchecked")
- InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
+ InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
// Publish queryable state
@@ -392,14 +268,14 @@ public abstract class AbstractKeyedStateBackend<K> implements
kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}
- return state;
+ return (S) kvState;
}
/**
* TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
* This method should be removed for the sake of namespaces being lazily fetched from the keyed
* state backend, or being set on the state directly.
- *
+ *
* @see KeyedStateBackend
*/
@SuppressWarnings("unchecked")
@@ -445,7 +321,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
}
@VisibleForTesting
- public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+ StreamCompressionDecorator getKeyGroupCompressionDecorator() {
return keyGroupCompressionDecorator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 97d1ad0..62aa30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -40,7 +43,6 @@ import java.io.IOException;
class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, ACC, OUT>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
-
private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
/**
@@ -53,7 +55,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
* @param defaultValue The default value for the state.
* @param aggregateFunction The aggregating function used for aggregating state.
*/
- HeapAggregatingState(
+ private HeapAggregatingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
@@ -111,7 +113,7 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
// ------------------------------------------------------------------------
@Override
- protected ACC mergeState(ACC a, ACC b) throws Exception {
+ protected ACC mergeState(ACC a, ACC b) {
return aggregateTransformation.aggFunction.merge(a, b);
}
@@ -124,11 +126,25 @@ class HeapAggregatingState<K, N, IN, ACC, OUT>
}
@Override
- public ACC apply(ACC accumulator, IN value) throws Exception {
+ public ACC apply(ACC accumulator, IN value) {
if (accumulator == null) {
accumulator = aggFunction.createAccumulator();
}
return aggFunction.add(value, accumulator);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <T, K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapAggregatingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ ((AggregatingStateDescriptor<T, SV, ?>) stateDesc).getAggregateFunction());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index d61c4c5..1bca719 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -41,7 +44,6 @@ import java.io.IOException;
class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapAppendingState<K, N, T, ACC, ACC>
implements InternalFoldingState<K, N, T, ACC> {
-
/** The function used to fold the state. */
private final FoldTransformation foldTransformation;
@@ -55,7 +57,7 @@ class HeapFoldingState<K, N, T, ACC>
* @param defaultValue The default value for the state.
* @param foldFunction The fold function used for folding state.
*/
- HeapFoldingState(
+ private HeapFoldingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
@@ -118,4 +120,18 @@ class HeapFoldingState<K, N, T, ACC>
return foldFunction.fold((previousState != null) ? previousState : getDefaultValue(), value);
}
}
+
+ @SuppressWarnings("unchecked")
+ static <K, N, SV, S extends State, IS extends S> IS create(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) {
+ return (IS) new HeapFoldingState<>(
+ stateTable,
+ keySerializer,
+ stateTable.getStateSerializer(),
+ stateTable.getNamespaceSerializer(),
+ stateDesc.getDefaultValue(),
+ ((FoldingStateDescriptor<SV, SV>) stateDesc).getFoldFunction());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bdde837/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 76479b0..82ce584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -59,12 +59,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
@@ -97,6 +92,23 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
+ Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
+ Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
+ Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
+ Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
+ Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private interface StateFactory {
+ <K, N, SV, S extends State, IS extends S> IS createState(
+ StateDescriptor<S, SV> stateDesc,
+ StateTable<K, N, SV> stateTable,
+ TypeSerializer<K> keySerializer) throws Exception;
+ }
+
/**
* Map of state tables that stores all state of key/value states. We store it centrally so
* that we can easily checkpoint/restore it.
@@ -110,8 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Map of state names to their corresponding restored state meta info.
*
- * <p>
- * TODO this map can be removed when eager-state registration is in place.
+ * <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -203,91 +214,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, V> InternalValueState<K, N, V> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<V> stateDesc) throws Exception {
-
- StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapListState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction());
- }
-
- @Override
- public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction());
- }
-
- @Override
- public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction());
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-
- return new HeapMapState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
+ public <N, SV, S extends State, IS extends S> IS createState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return stateFactory.createState(stateDesc, stateTable, keySerializer);
}
@Override
@@ -411,7 +348,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int writtenKeyGroupIndex = inView.readInt();
try (InputStream kgCompressionInStream =
- streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+ streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
DataInputViewStreamWrapper kgCompressionInView =
new DataInputViewStreamWrapper(kgCompressionInStream);