You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/14 03:22:57 UTC

[2/4] flink git commit: Hide broadcast state / remove from public API

Hide broadcast state / remove from public API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b9212f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b9212f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b9212f5

Branch: refs/heads/release-1.2
Commit: 9b9212f55e902cd2b2bc4a8dc0bfdea86ab39869
Parents: 29fbc49
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 13 16:38:33 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 21:38:24 2017 +0100

----------------------------------------------------------------------
 .../api/common/state/OperatorStateStore.java    | 27 --------------------
 .../state/DefaultOperatorStateBackend.java      |  2 --
 .../runtime/state/OperatorStateBackendTest.java | 12 +++++----
 .../test/checkpointing/RescalingITCase.java     |  5 +++-
 4 files changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 87a7759..c1cdfe4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -57,33 +57,6 @@ public interface OperatorStateStore {
 	<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
 
 	/**
-	 * Creates (or restores) a list state. Each state is registered under a unique name.
-	 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
-	 *
-	 * On restore, all items in the list are broadcasted to all parallel operator instances.
-	 *
-	 * @param stateDescriptor The descriptor for this state, providing a name and serializer.
-	 * @param <S> The generic type of the state
-	 *
-	 * @return A list for all state partitions.
-	 * @throws Exception
-	 */
-	<S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
-
-	/**
-	 * Creates a state of the given name that uses Java serialization to persist the state. On restore, all items
-	 * in the list are broadcasted to all parallel operator instances.
-	 *
-	 * <p>This is a simple convenience method. For more flexibility on how state serialization
-	 * should happen, use the {@link #getBroadcastOperatorState(ListStateDescriptor)} method.
-	 *
-	 * @param stateName The name of state to create
-	 * @return A list state using Java serialization to serialize state objects.
-	 * @throws Exception
-	 */
-	<T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception;
-
-	/**
 	 * Returns a set with the names of all currently registered states.
 	 * @return set of names for all registered states.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 6c65088..1cd1da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -91,12 +91,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 
 	@SuppressWarnings("unchecked")
-	@Override
 	public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception {
 		return (ListState<T>) getBroadcastOperatorState(new ListStateDescriptor<>(stateName, javaSerializer));
 	}
 
-	@Override
 	public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
 		return getOperatorState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index cd0391f..5bd085f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -45,8 +45,9 @@ public class OperatorStateBackendTest {
 		return env;
 	}
 
-	private OperatorStateBackend createNewOperatorStateBackend() throws Exception {
-		return abstractStateBackend.createOperatorStateBackend(
+	private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
+		//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+		return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
 				createMockEnvironment(),
 				"test-operator");
 	}
@@ -60,7 +61,7 @@ public class OperatorStateBackendTest {
 
 	@Test
 	public void testRegisterStates() throws Exception {
-		OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -143,7 +144,7 @@ public class OperatorStateBackendTest {
 
 	@Test
 	public void testSnapshotRestore() throws Exception {
-		OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -171,7 +172,8 @@ public class OperatorStateBackendTest {
 			operatorStateBackend.close();
 			operatorStateBackend.dispose();
 
-			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+			//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+			operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
 					createMockEnvironment(),
 					"testOperator");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b9212f5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 45fcc25..bd1678e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -969,8 +970,10 @@ public class RescalingITCase extends TestLogger {
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 
 			if (broadcast) {
+				//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
+				DefaultOperatorStateBackend operatorStateStore = (DefaultOperatorStateBackend) context.getOperatorStateStore();
 				this.counterPartitions =
-						context.getOperatorStateStore().getBroadcastSerializableListState("counter_partitions");
+						operatorStateStore.getBroadcastSerializableListState("counter_partitions");
 			} else {
 				this.counterPartitions =
 						context.getOperatorStateStore().getSerializableListState("counter_partitions");