You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/23 15:13:53 UTC

flink git commit: [FLINK-5000] Rename Methods in ManagedInitializationContext

Repository: flink
Updated Branches:
  refs/heads/master 3e3a90d89 -> 4656350fc


[FLINK-5000] Rename Methods in ManagedInitializationContext

This removes "managed" from the OperatorStateStore and KeyedStateStore
access methods. There is no "un-managed" state and users might be
wondering what "managed" means here.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4656350f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4656350f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4656350f

Branch: refs/heads/master
Commit: 4656350fc33d42ff96ad6d5e836e62172b4b0de6
Parents: 3e3a90d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 23 14:58:33 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 23 16:13:29 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/ManagedInitializationContext.java    | 4 ++--
 .../flink/runtime/state/StateInitializationContextImpl.java  | 6 +++---
 .../streaming/connectors/fs/bucketing/BucketingSink.java     | 2 +-
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 2 +-
 .../streaming/connectors/kafka/FlinkKafkaProducerBase.java   | 2 +-
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java         | 8 ++++----
 .../api/functions/source/ContinuousFileReaderOperator.java   | 2 +-
 .../streaming/api/operators/AbstractUdfStreamOperator.java   | 2 +-
 .../api/operators/StreamOperatorSnapshotRestoreTest.java     | 8 ++------
 .../org/apache/flink/test/checkpointing/RescalingITCase.java | 6 +++---
 10 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index abc528b..5255c43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -43,11 +43,11 @@ public interface ManagedInitializationContext {
 	/**
 	 * Returns an interface that allows for registering operator state with the backend.
 	 */
-	OperatorStateStore getManagedOperatorStateStore();
+	OperatorStateStore getOperatorStateStore();
 
 	/**
 	 * Returns an interface that allows for registering keyed state with the backend.
 	 */
-	KeyedStateStore getManagedKeyedStateStore();
+	KeyedStateStore getKeyedStateStore();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index b131d14..c86ff6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -122,12 +122,12 @@ public class StateInitializationContextImpl implements StateInitializationContex
 	}
 
 	@Override
-	public OperatorStateStore getManagedOperatorStateStore() {
+	public OperatorStateStore getOperatorStateStore() {
 		return operatorStateStore;
 	}
 
 	@Override
-	public KeyedStateStore getManagedKeyedStateStore() {
+	public KeyedStateStore getKeyedStateStore() {
 		return keyedStateStore;
 	}
 
@@ -268,4 +268,4 @@ public class StateInitializationContextImpl implements StateInitializationContex
 			throw new UnsupportedOperationException("Read only Iterator");
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 1da56b4..cf2c373 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -350,7 +350,7 @@ public class BucketingSink<T>
 			this.refTruncate = reflectTruncate(fs);
 		}
 
-		OperatorStateStore stateStore = context.getManagedOperatorStateStore();
+		OperatorStateStore stateStore = context.getOperatorStateStore();
 		restoredBucketStates = stateStore.getSerializableListState("bucket-states");
 
 		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5161b35..aef7116 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -314,7 +314,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
 
-		OperatorStateStore stateStore = context.getManagedOperatorStateStore();
+		OperatorStateStore stateStore = context.getOperatorStateStore();
 		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
 		if (context.isRestored()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 33289f8..d413f1c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -343,7 +343,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		this.stateStore = context.getManagedOperatorStateStore();
+		this.stateStore = context.getOperatorStateStore();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 9b7eabf..b96ba30 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -132,7 +132,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
-		when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 		when(initializationContext.isRestored()).thenReturn(true);
 
 		consumer.initializeState(initializationContext);
@@ -172,7 +172,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
-		when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 		when(initializationContext.isRestored()).thenReturn(false);
 
 		consumer.initializeState(initializationContext);
@@ -199,7 +199,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
-		when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 
 		// make the context signal that there is no restored state, then validate that
 		when(initializationContext.isRestored()).thenReturn(false);
@@ -245,7 +245,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
-		when(initializationContext.getManagedOperatorStateStore()).thenReturn(backend);
+		when(initializationContext.getOperatorStateStore()).thenReturn(backend);
 		when(initializationContext.isRestored()).thenReturn(false, true, true, true);
 
 		consumer.initializeState(initializationContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 74c58f9..bbe1ea5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -92,7 +92,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 		checkState(this.checkpointedState == null && this.restoredReaderState == null,
 			"The reader state has already been initialized.");
 
-		checkpointedState = context.getManagedOperatorStateStore().getSerializableListState("splits");
+		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 72ed5dc..c1f783f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -130,7 +130,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 			@SuppressWarnings("unchecked")
 			ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
 
-			ListState<Serializable> listState = context.getManagedOperatorStateStore().
+			ListState<Serializable> listState = context.getOperatorStateStore().
 					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
 			List<Serializable> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index c02a7c3..08fbcbe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -28,10 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -39,13 +37,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.InputStream;
 import java.util.BitSet;
-import java.util.Collections;
 
 public class StreamOperatorSnapshotRestoreTest {
 
@@ -173,8 +169,8 @@ public class StreamOperatorSnapshotRestoreTest {
 
 			Assert.assertEquals(verifyRestore, context.isRestored());
 
-			keyedState = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
-			opState = context.getManagedOperatorStateStore().getSerializableListState("managed-op-state");
+			keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
+			opState = context.getOperatorStateStore().getSerializableListState("managed-op-state");
 
 			if (context.isRestored()) {
 				// check restored raw keyed state

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 09de67f..bc65abf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -792,8 +792,8 @@ public class RescalingITCase extends TestLogger {
 
 		@Override
 		public void initializeState(FunctionInitializationContext context) throws Exception {
-			counter = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
-			sum = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
+			counter = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
+			sum = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
 		}
 	}
 
@@ -937,7 +937,7 @@ public class RescalingITCase extends TestLogger {
 		@Override
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 			this.counterPartitions =
-					context.getManagedOperatorStateStore().getSerializableListState("counter_partitions");
+					context.getOperatorStateStore().getSerializableListState("counter_partitions");
 			if (context.isRestored()) {
 				for (int v : counterPartitions.get()) {
 					counter += v;