You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/14 03:22:57 UTC
[2/4] flink git commit: Hide broadcast state / remove from public API
Hide broadcast state / remove from public API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b9212f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b9212f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b9212f5
Branch: refs/heads/release-1.2
Commit: 9b9212f55e902cd2b2bc4a8dc0bfdea86ab39869
Parents: 29fbc49
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 13 16:38:33 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 21:38:24 2017 +0100
----------------------------------------------------------------------
.../api/common/state/OperatorStateStore.java | 27 --------------------
.../state/DefaultOperatorStateBackend.java | 2 --
.../runtime/state/OperatorStateBackendTest.java | 12 +++++----
.../test/checkpointing/RescalingITCase.java | 5 +++-
4 files changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 87a7759..c1cdfe4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -57,33 +57,6 @@ public interface OperatorStateStore {
<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
/**
- * Creates (or restores) a list state. Each state is registered under a unique name.
- * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
- *
- * On restore, all items in the list are broadcasted to all parallel operator instances.
- *
- * @param stateDescriptor The descriptor for this state, providing a name and serializer.
- * @param <S> The generic type of the state
- *
- * @return A list for all state partitions.
- * @throws Exception
- */
- <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
-
- /**
- * Creates a state of the given name that uses Java serialization to persist the state. On restore, all items
- * in the list are broadcasted to all parallel operator instances.
- *
- * <p>This is a simple convenience method. For more flexibility on how state serialization
- * should happen, use the {@link #getBroadcastOperatorState(ListStateDescriptor)} method.
- *
- * @param stateName The name of state to create
- * @return A list state using Java serialization to serialize state objects.
- * @throws Exception
- */
- <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception;
-
- /**
* Returns a set with the names of all currently registered states.
* @return set of names for all registered states.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 6c65088..1cd1da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -91,12 +91,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
@SuppressWarnings("unchecked")
- @Override
public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception {
return (ListState<T>) getBroadcastOperatorState(new ListStateDescriptor<>(stateName, javaSerializer));
}
- @Override
public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getOperatorState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index cd0391f..5bd085f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -45,8 +45,9 @@ public class OperatorStateBackendTest {
return env;
}
- private OperatorStateBackend createNewOperatorStateBackend() throws Exception {
- return abstractStateBackend.createOperatorStateBackend(
+ private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
+ //TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+ return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(),
"test-operator");
}
@@ -60,7 +61,7 @@ public class OperatorStateBackendTest {
@Test
public void testRegisterStates() throws Exception {
- OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -143,7 +144,7 @@ public class OperatorStateBackendTest {
@Test
public void testSnapshotRestore() throws Exception {
- OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -171,7 +172,8 @@ public class OperatorStateBackendTest {
operatorStateBackend.close();
operatorStateBackend.dispose();
- operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+ //TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+ operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(),
"testOperator");
http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 45fcc25..bd1678e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -969,8 +970,10 @@ public class RescalingITCase extends TestLogger {
public void initializeState(FunctionInitializationContext context) throws Exception {
if (broadcast) {
+ //TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+ DefaultOperatorStateBackend operatorStateStore = (DefaultOperatorStateBackend) context.getOperatorStateStore();
this.counterPartitions =
- context.getOperatorStateStore().getBroadcastSerializableListState("counter_partitions");
+ operatorStateStore.getBroadcastSerializableListState("counter_partitions");
} else {
this.counterPartitions =
context.getOperatorStateStore().getSerializableListState("counter_partitions");