You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/23 15:13:53 UTC
flink git commit: [FLINK-5000] Rename Methods in
ManagedInitializationContext
Repository: flink
Updated Branches:
refs/heads/master 3e3a90d89 -> 4656350fc
[FLINK-5000] Rename Methods in ManagedInitializationContext
This removes "managed" from the OperatorStateStore and KeyedStateStore
access methods. There is no "un-managed" state and users might be
wondering what "managed" means here.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4656350f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4656350f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4656350f
Branch: refs/heads/master
Commit: 4656350fc33d42ff96ad6d5e836e62172b4b0de6
Parents: 3e3a90d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 23 14:58:33 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 23 16:13:29 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/state/ManagedInitializationContext.java | 4 ++--
.../flink/runtime/state/StateInitializationContextImpl.java | 6 +++---
.../streaming/connectors/fs/bucketing/BucketingSink.java | 2 +-
.../streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 2 +-
.../streaming/connectors/kafka/FlinkKafkaProducerBase.java | 2 +-
.../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 8 ++++----
.../api/functions/source/ContinuousFileReaderOperator.java | 2 +-
.../streaming/api/operators/AbstractUdfStreamOperator.java | 2 +-
.../api/operators/StreamOperatorSnapshotRestoreTest.java | 8 ++------
.../org/apache/flink/test/checkpointing/RescalingITCase.java | 6 +++---
10 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index abc528b..5255c43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -43,11 +43,11 @@ public interface ManagedInitializationContext {
/**
* Returns an interface that allows for registering operator state with the backend.
*/
- OperatorStateStore getManagedOperatorStateStore();
+ OperatorStateStore getOperatorStateStore();
/**
* Returns an interface that allows for registering keyed state with the backend.
*/
- KeyedStateStore getManagedKeyedStateStore();
+ KeyedStateStore getKeyedStateStore();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index b131d14..c86ff6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -122,12 +122,12 @@ public class StateInitializationContextImpl implements StateInitializationContex
}
@Override
- public OperatorStateStore getManagedOperatorStateStore() {
+ public OperatorStateStore getOperatorStateStore() {
return operatorStateStore;
}
@Override
- public KeyedStateStore getManagedKeyedStateStore() {
+ public KeyedStateStore getKeyedStateStore() {
return keyedStateStore;
}
@@ -268,4 +268,4 @@ public class StateInitializationContextImpl implements StateInitializationContex
throw new UnsupportedOperationException("Read only Iterator");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 1da56b4..cf2c373 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -350,7 +350,7 @@ public class BucketingSink<T>
this.refTruncate = reflectTruncate(fs);
}
- OperatorStateStore stateStore = context.getManagedOperatorStateStore();
+ OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5161b35..aef7116 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -314,7 +314,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- OperatorStateStore stateStore = context.getManagedOperatorStateStore();
+ OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
if (context.isRestored()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 33289f8..d413f1c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -343,7 +343,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- this.stateStore = context.getManagedOperatorStateStore();
+ this.stateStore = context.getOperatorStateStore();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 9b7eabf..b96ba30 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -132,7 +132,7 @@ public class FlinkKafkaConsumerBaseTest {
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
- when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
@@ -172,7 +172,7 @@ public class FlinkKafkaConsumerBaseTest {
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
- when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(false);
consumer.initializeState(initializationContext);
@@ -199,7 +199,7 @@ public class FlinkKafkaConsumerBaseTest {
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
- when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
// make the context signal that there is no restored state, then validate that
when(initializationContext.isRestored()).thenReturn(false);
@@ -245,7 +245,7 @@ public class FlinkKafkaConsumerBaseTest {
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
- when(initializationContext.getManagedOperatorStateStore()).thenReturn(backend);
+ when(initializationContext.getOperatorStateStore()).thenReturn(backend);
when(initializationContext.isRestored()).thenReturn(false, true, true, true);
consumer.initializeState(initializationContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 74c58f9..bbe1ea5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -92,7 +92,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
checkState(this.checkpointedState == null && this.restoredReaderState == null,
"The reader state has already been initialized.");
- checkpointedState = context.getManagedOperatorStateStore().getSerializableListState("splits");
+ checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 72ed5dc..c1f783f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -130,7 +130,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
@SuppressWarnings("unchecked")
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
- ListState<Serializable> listState = context.getManagedOperatorStateStore().
+ ListState<Serializable> listState = context.getOperatorStateStore().
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
List<Serializable> list = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index c02a7c3..08fbcbe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -28,10 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -39,13 +37,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Test;
import java.io.InputStream;
import java.util.BitSet;
-import java.util.Collections;
public class StreamOperatorSnapshotRestoreTest {
@@ -173,8 +169,8 @@ public class StreamOperatorSnapshotRestoreTest {
Assert.assertEquals(verifyRestore, context.isRestored());
- keyedState = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
- opState = context.getManagedOperatorStateStore().getSerializableListState("managed-op-state");
+ keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
+ opState = context.getOperatorStateStore().getSerializableListState("managed-op-state");
if (context.isRestored()) {
// check restored raw keyed state
http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 09de67f..bc65abf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -792,8 +792,8 @@ public class RescalingITCase extends TestLogger {
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- counter = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
- sum = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
+ counter = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
+ sum = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
}
}
@@ -937,7 +937,7 @@ public class RescalingITCase extends TestLogger {
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.counterPartitions =
- context.getManagedOperatorStateStore().getSerializableListState("counter_partitions");
+ context.getOperatorStateStore().getSerializableListState("counter_partitions");
if (context.isRestored()) {
for (int v : counterPartitions.get()) {
counter += v;