You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:48 UTC
[37/50] [abbrv] flink git commit: [FLINK-6034] [checkpoints]
Introduce KeyedStateHandle abstraction for the snapshots in keyed streams
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index b1c94cb..8aa76a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.ExceptionUtils;
@@ -30,8 +30,8 @@ import java.util.concurrent.RunnableFuture;
*/
public class OperatorSnapshotResult {
- private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
- private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
+ private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
+ private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
@@ -40,8 +40,8 @@ public class OperatorSnapshotResult {
}
public OperatorSnapshotResult(
- RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture,
- RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture,
+ RunnableFuture<KeyedStateHandle> keyedStateManagedFuture,
+ RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
RunnableFuture<OperatorStateHandle> operatorStateManagedFuture,
RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
this.keyedStateManagedFuture = keyedStateManagedFuture;
@@ -50,19 +50,19 @@ public class OperatorSnapshotResult {
this.operatorStateRawFuture = operatorStateRawFuture;
}
- public RunnableFuture<KeyGroupsStateHandle> getKeyedStateManagedFuture() {
+ public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
return keyedStateManagedFuture;
}
- public void setKeyedStateManagedFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture) {
+ public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) {
this.keyedStateManagedFuture = keyedStateManagedFuture;
}
- public RunnableFuture<KeyGroupsStateHandle> getKeyedStateRawFuture() {
+ public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
return keyedStateRawFuture;
}
- public void setKeyedStateRawFuture(RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture) {
+ public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) {
this.keyedStateRawFuture = keyedStateRawFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
index 7abf8d9..30d07b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
@@ -42,16 +42,16 @@ public class OperatorStateHandles {
private final StreamStateHandle legacyOperatorState;
- private final Collection<KeyGroupsStateHandle> managedKeyedState;
- private final Collection<KeyGroupsStateHandle> rawKeyedState;
+ private final Collection<KeyedStateHandle> managedKeyedState;
+ private final Collection<KeyedStateHandle> rawKeyedState;
private final Collection<OperatorStateHandle> managedOperatorState;
private final Collection<OperatorStateHandle> rawOperatorState;
public OperatorStateHandles(
int operatorChainIndex,
StreamStateHandle legacyOperatorState,
- Collection<KeyGroupsStateHandle> managedKeyedState,
- Collection<KeyGroupsStateHandle> rawKeyedState,
+ Collection<KeyedStateHandle> managedKeyedState,
+ Collection<KeyedStateHandle> rawKeyedState,
Collection<OperatorStateHandle> managedOperatorState,
Collection<OperatorStateHandle> rawOperatorState) {
@@ -83,11 +83,11 @@ public class OperatorStateHandles {
return legacyOperatorState;
}
- public Collection<KeyGroupsStateHandle> getManagedKeyedState() {
+ public Collection<KeyedStateHandle> getManagedKeyedState() {
return managedKeyedState;
}
- public Collection<KeyGroupsStateHandle> getRawKeyedState() {
+ public Collection<KeyedStateHandle> getRawKeyedState() {
return rawKeyedState;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 76b2b98..11e8e0d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
@@ -849,8 +849,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final List<OperatorSnapshotResult> snapshotInProgressList;
- private RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles;
- private RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles;
+ private RunnableFuture<KeyedStateHandle> futureKeyedBackendStateHandles;
+ private RunnableFuture<KeyedStateHandle> futureKeyedStreamStateHandles;
private List<StreamStateHandle> nonPartitionedStateHandles;
@@ -892,8 +892,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
public void run() {
try {
// Keyed state handle future, currently only one (the head) operator can have this
- KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
- KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
+ KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
+ KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size());
List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
@@ -987,8 +987,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState,
ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend,
ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream,
- KeyGroupsStateHandle keyedStateHandleBackend,
- KeyGroupsStateHandle keyedStateHandleStream) {
+ KeyedStateHandle keyedStateHandleBackend,
+ KeyedStateHandle keyedStateHandleStream) {
boolean hasAnyState = keyedStateHandleBackend != null
|| keyedStateHandleStream != null
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index eeee8dc..8f42c1a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
@@ -559,11 +559,11 @@ public class AbstractStreamOperatorTest {
final CloseableRegistry closeableRegistry = new CloseableRegistry();
- RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle = mock(RunnableFuture.class);
+ RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = mock(RunnableFuture.class);
RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class);
StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
- when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle);
+ when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult());
@@ -609,9 +609,9 @@ public class AbstractStreamOperatorTest {
verify(context).close();
verify(operatorSnapshotResult).cancel();
- verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+ verify(futureKeyedStateHandle).cancel(anyBoolean());
verify(futureOperatorStateHandle).cancel(anyBoolean());
- verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+ verify(futureKeyedStateHandle).cancel(anyBoolean());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index 490df52..f57eed1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -41,12 +41,12 @@ public class OperatorSnapshotResultTest extends TestLogger {
operatorSnapshotResult.cancel();
- KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class);
- RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
+ KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class);
+ RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
- KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class);
- RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
+ KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class);
+ RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 963c42c..8e0edfc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
@@ -75,7 +76,7 @@ public class StateInitializationContextImplTest {
ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);
- List<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<>(NUM_HANDLES);
+ List<KeyedStateHandle> keyedStateHandles = new ArrayList<>(NUM_HANDLES);
int prev = 0;
for (int i = 0; i < NUM_HANDLES; ++i) {
out.reset();
@@ -91,10 +92,10 @@ public class StateInitializationContextImplTest {
++writtenKeyGroups;
}
- KeyGroupsStateHandle handle =
+ KeyedStateHandle handle =
new KeyGroupsStateHandle(offsets, new ByteStateHandleCloseChecking("kg-" + i, out.toByteArray()));
- keyGroupsStateHandles.add(handle);
+ keyedStateHandles.add(handle);
}
List<OperatorStateHandle> operatorStateHandles = new ArrayList<>(NUM_HANDLES);
@@ -125,7 +126,7 @@ public class StateInitializationContextImplTest {
true,
stateStore,
mock(KeyedStateStore.class),
- keyGroupsStateHandles,
+ keyedStateHandles,
operatorStateHandles,
closableRegistry);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 58cfefd..4435247 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -186,8 +187,8 @@ public class InterruptSensitiveRestoreTest {
ChainedStateHandle<StreamStateHandle> operatorState = null;
- List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList();
- List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList();
+ List<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
+ List<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.emptyList();
List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList();
@@ -201,8 +202,8 @@ public class InterruptSensitiveRestoreTest {
Collection<OperatorStateHandle> operatorStateHandles =
Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state));
- List<KeyGroupsStateHandle> keyGroupsStateHandles =
- Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
+ List<KeyedStateHandle> keyedStateHandles =
+ Collections.<KeyedStateHandle>singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
switch (mode) {
case OPERATOR_MANAGED:
@@ -212,10 +213,10 @@ public class InterruptSensitiveRestoreTest {
operatorStateStream = Collections.singletonList(operatorStateHandles);
break;
case KEYED_MANAGED:
- keyGroupStateFromBackend = keyGroupsStateHandles;
+ keyedStateFromBackend = keyedStateHandles;
break;
case KEYED_RAW:
- keyGroupStateFromStream = keyGroupsStateHandles;
+ keyedStateFromStream = keyedStateHandles;
break;
case LEGACY:
operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
@@ -228,8 +229,8 @@ public class InterruptSensitiveRestoreTest {
operatorState,
operatorStateBackend,
operatorStateStream,
- keyGroupStateFromBackend,
- keyGroupStateFromStream);
+ keyedStateFromBackend,
+ keyedStateFromStream);
JobInformation jobInformation = new JobInformation(
new JobID(),
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d7e3d6c..f34522b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
@@ -458,8 +458,8 @@ public class StreamTaskTest extends TestLogger {
StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
- KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
- KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
+ KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
@@ -563,8 +563,8 @@ public class StreamTaskTest extends TestLogger {
(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
- (KeyGroupsStateHandle)invocation.getArguments()[3],
- (KeyGroupsStateHandle)invocation.getArguments()[4]);
+ (KeyedStateHandle)invocation.getArguments()[3],
+ (KeyedStateHandle)invocation.getArguments()[4]);
}
});
@@ -574,8 +574,8 @@ public class StreamTaskTest extends TestLogger {
StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
- KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
- KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
+ KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 945103c..912d579 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -318,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state);
- List<KeyGroupsStateHandle> keyGroupStatesList = new ArrayList<>();
+ List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>();
if (state.getKvStates() != null) {
KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState(
state.getKvStates(),
@@ -331,7 +332,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
initializeState(new OperatorStateHandles(0,
stateHandle,
keyGroupStatesList,
- Collections.<KeyGroupsStateHandle>emptyList(),
+ Collections.<KeyedStateHandle>emptyList(),
Collections.<OperatorStateHandle>emptyList(),
Collections.<OperatorStateHandle>emptyList()));
}
@@ -364,16 +365,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
KeyGroupRange localKeyGroupRange =
keyGroupPartitions.get(subtaskIndex);
- List<KeyGroupsStateHandle> localManagedKeyGroupState = null;
+ List<KeyedStateHandle> localManagedKeyGroupState = null;
if (operatorStateHandles.getManagedKeyedState() != null) {
- localManagedKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles(
+ localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles(
operatorStateHandles.getManagedKeyedState(),
localKeyGroupRange);
}
- List<KeyGroupsStateHandle> localRawKeyGroupState = null;
+ List<KeyedStateHandle> localRawKeyGroupState = null;
if (operatorStateHandles.getRawKeyedState() != null) {
- localRawKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles(
+ localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles(
operatorStateHandles.getRawKeyedState(),
localKeyGroupRange);
}
@@ -442,15 +443,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length);
List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length);
- List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
- List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
+ List<KeyedStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
+ List<KeyedStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
for (OperatorStateHandles handle: handles) {
Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState();
Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState();
- Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState();
- Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState();
+ Collection<KeyedStateHandle> managedKeyedState = handle.getManagedKeyedState();
+ Collection<KeyedStateHandle> rawKeyedState = handle.getRawKeyedState();
if (managedOperatorState != null) {
mergedManagedOperatorState.addAll(managedOperatorState);
@@ -502,8 +503,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
timestamp,
CheckpointOptions.forFullCheckpoint());
- KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
- KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
+ KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
+ KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index d45ae21..d9c7387 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -65,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
// when we restore we keep the state here so that we can call restore
// when the operator requests the keyed state backend
- private List<KeyGroupsStateHandle> restoredKeyedState = null;
+ private List<KeyedStateHandle> restoredKeyedState = null;
public KeyedOneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
@@ -144,7 +145,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
if (keyedStateBackend != null) {
- RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+ RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
checkpointId,
timestamp,
streamFactory,
@@ -177,14 +178,14 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
byte keyedStatePresent = (byte) inStream.read();
if (keyedStatePresent == 1) {
ObjectInputStream ois = new ObjectInputStream(inStream);
- this.restoredKeyedState = Collections.singletonList((KeyGroupsStateHandle) ois.readObject());
+ this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject());
}
}
}
- private static boolean hasMigrationHandles(Collection<KeyGroupsStateHandle> allKeyGroupsHandles) {
- for (KeyGroupsStateHandle handle : allKeyGroupsHandles) {
+ private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) {
+ for (KeyedStateHandle handle : allKeyGroupsHandles) {
if (handle instanceof Migration) {
return true;
}
@@ -225,17 +226,17 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
keyGroupPartitions.get(subtaskIndex);
restoredKeyedState = null;
- Collection<KeyGroupsStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState();
+ Collection<KeyedStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState();
if (managedKeyedState != null) {
// if we have migration handles, don't reshuffle state and preserve
// the migration tag
if (hasMigrationHandles(managedKeyedState)) {
- List<KeyGroupsStateHandle> result = new ArrayList<>(managedKeyedState.size());
+ List<KeyedStateHandle> result = new ArrayList<>(managedKeyedState.size());
result.addAll(managedKeyedState);
restoredKeyedState = result;
} else {
- restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles(
+ restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles(
managedKeyedState,
localKeyGroupRange);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 8e76f70..41a083a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.mockito.invocation.InvocationOnMock;
@@ -50,7 +51,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
// when we restore we keep the state here so that we can call restore
// when the operator requests the keyed state backend
- private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+ private Collection<KeyedStateHandle> restoredKeyedState = null;
public KeyedTwoInputStreamOperatorTestHarness(
TwoInputStreamOperator<IN1, IN2, OUT> operator,