You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:03 UTC
[4/5] flink git commit: [FLINK-5041] Savepoint Backwards
Compatibility 1.1 -> 1.2
[FLINK-5041] Savepoint Backwards Compatibility 1.1 -> 1.2
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e95fe56b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e95fe56b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e95fe56b
Branch: refs/heads/master
Commit: e95fe56b60158fc4df511a1404f7346bd18b8f12
Parents: 8cda6a2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBFoldingState.java | 6 ++--
.../org/apache/flink/core/io/Versioned.java | 31 ++++++++++++++++++
.../java/org/apache/flink/util/Migration.java | 25 ++++++++++++++
.../savepoint/SavepointV0Serializer.java | 7 ++--
.../runtime/state/AbstractStateBackend.java | 6 +++-
.../state/MigrationKeyGroupStateHandle.java | 3 +-
.../state/MigrationStreamStateHandle.java | 13 ++++++--
.../runtime/checkpoint/savepoint/Savepoint.java | 13 ++------
.../savepoint/SavepointV1Serializer.java | 16 ++++-----
.../savepoint/MigrationV0ToV1Test.java | 13 ++------
.../runtime/state/OperatorStateBackendTest.java | 2 +-
.../api/operators/AbstractStreamOperator.java | 34 +++++++++++---------
.../operators/AbstractUdfStreamOperator.java | 8 +++++
13 files changed, 122 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 3018f7b..9c2bf4f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
/**
@@ -85,7 +85,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
if (valueBytes == null) {
return null;
}
- return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+ return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
} catch (IOException|RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
@@ -103,7 +103,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
- ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+ ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
new file mode 100644
index 0000000..b36d5e8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * This interface is implemented by classes that provide a version number. Versions numbers can be used to differentiate
+ * between evolving classes.
+ */
+public interface Versioned {
+
+ /**
+ * Returns the version number of the object. Versions numbers can be used to differentiate evolving classes.
+ */
+ int getVersion();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/util/Migration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Migration.java b/flink-core/src/main/java/org/apache/flink/util/Migration.java
new file mode 100644
index 0000000..4bd9e39
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/Migration.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Tagging interface for migration related classes.
+ */
+public interface Migration {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index dc307e2..6c6a8f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.StateHandle;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
@@ -266,10 +267,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
}
if (null != operatorState) {
- mergeStateHandles.add(SIGNAL_1);
mergeStateHandles.add(convertStateHandle(operatorState));
- } else {
- mergeStateHandles.add(SIGNAL_0);
}
return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
@@ -340,6 +338,9 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
byte[] data =
((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+ } else if (oldStateHandle instanceof AbstractStateBackend.DataInputViewHandle) {
+ return convertStateHandle(
+ ((AbstractStateBackend.DataInputViewHandle) oldStateHandle).getStreamStateHandle());
}
throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
index b7932f5..1d76c06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
@@ -35,7 +35,7 @@ public abstract class AbstractStateBackend implements Serializable {
/**
* Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
*/
- private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+ public static final class DataInputViewHandle implements StateHandle<DataInputView> {
private static final long serialVersionUID = 2891559813513532079L;
@@ -45,6 +45,10 @@ public abstract class AbstractStateBackend implements Serializable {
this.stream = stream;
}
+ public StreamStateHandle getStreamStateHandle() {
+ return stream;
+ }
+
@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
index 1bebcb6..995d234 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
@Internal
@Deprecated
@@ -29,7 +30,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
* This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
* e.g. when restoring backend data from a state handle.
*/
-public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
+public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle implements Migration {
private static final long serialVersionUID = -8554427169776881697L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
index e2da757..e7831a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -20,7 +20,9 @@ package org.apache.flink.migration.state;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
import java.io.IOException;
@@ -30,7 +32,7 @@ import java.io.IOException;
* This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
* when restoring backend data from a state handle.
*/
-public class MigrationStreamStateHandle implements StreamStateHandle {
+public class MigrationStreamStateHandle implements StreamStateHandle, Migration {
private static final long serialVersionUID = -2332113722532150112L;
private final StreamStateHandle delegate;
@@ -41,7 +43,7 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
@Override
public FSDataInputStream openInputStream() throws IOException {
- return delegate.openInputStream();
+ return new MigrationFSInputStream(delegate.openInputStream());
}
@Override
@@ -53,4 +55,11 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
public long getStateSize() {
return delegate.getStateSize();
}
+
+ static class MigrationFSInputStream extends FSDataInputStreamWrapper implements Migration {
+
+ public MigrationFSInputStream(FSDataInputStream inputStream) {
+ super(inputStream);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 643f14c..baad05f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.core.io.Versioned;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TaskState;
@@ -34,17 +35,7 @@ import java.util.Collection;
* <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
* via a {@link SavepointStore}.
*/
-public interface Savepoint {
-
- /**
- * Returns the savepoint version.
- *
- * <p>This version is independent of the Flink version, e.g. multiple Flink
- * versions can work the same savepoint version.
- *
- * @return Savepoint version
- */
- int getVersion();
+public interface Savepoint extends Versioned {
/**
* Returns the checkpoint ID of the savepoint.
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index cd3e87f..4d16c13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -211,7 +211,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
duration);
}
- public static void serializeKeyGroupStateHandle(
+ private static void serializeKeyGroupStateHandle(
KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -227,7 +227,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+ private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -247,7 +247,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static void serializeOperatorStateHandle(
+ private static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -258,8 +258,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.writeUTF(entry.getKey());
long[] offsets = entry.getValue();
dos.writeInt(offsets.length);
- for (int i = 0; i < offsets.length; ++i) {
- dos.writeLong(offsets[i]);
+ for (long offset : offsets) {
+ dos.writeLong(offset);
}
}
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
@@ -268,7 +268,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static OperatorStateHandle deserializeOperatorStateHandle(
+ private static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {
final int type = dis.readByte();
@@ -292,7 +292,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static void serializeStreamStateHandle(
+ private static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
@@ -319,7 +319,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.flush();
}
- public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
int type = dis.read();
if (NULL_HANDLE == type) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 02365c7..4208fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -147,13 +147,8 @@ public class MigrationV0ToV1Test {
//check operator state
expTestState.f0 = 1;
- if (p % 3 != 0) {
- assertEquals(1, is.read());
- actTestState = InstantiationUtil.deserializeObject(is, cl);
- assertEquals(expTestState, actTestState);
- } else {
- assertEquals(0, is.read());
- }
+ actTestState = InstantiationUtil.deserializeObject(is, cl);
+ assertEquals(expTestState, actTestState);
}
}
@@ -210,9 +205,7 @@ public class MigrationV0ToV1Test {
state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
}
testState = new Tuple4<>(1, i, j, k);
- if (j % 3 != 0) {
- state.setOperatorState(new SerializedStateHandle<>(testState));
- }
+ state.setOperatorState(new SerializedStateHandle<>(testState));
if ((0 == k) && (i % 3 != 0)) {
HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 2db8735..51e3739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -160,4 +160,4 @@ public class OperatorStateBackendTest {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 839abf8..f9b711e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,11 +28,11 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
@@ -58,10 +58,10 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,7 +201,6 @@ public abstract class AbstractStreamOperator<OUT>
if (restoring) {
- // TODO check that there is EITHER old OR new state in handles!
restoreStreamCheckpointed(stateHandles);
//pass directly
@@ -230,18 +229,23 @@ public abstract class AbstractStreamOperator<OUT>
@Deprecated
private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
StreamStateHandle state = stateHandles.getLegacyOperatorState();
- if (this instanceof StreamCheckpointedOperator && null != state) {
+ if (null != state) {
+ if (this instanceof StreamCheckpointedOperator) {
- LOG.debug("Restore state of task {} in chain ({}).",
- stateHandles.getOperatorChainIndex(), getContainingTask().getName());
+ LOG.debug("Restore state of task {} in chain ({}).",
+ stateHandles.getOperatorChainIndex(), getContainingTask().getName());
- FSDataInputStream is = state.openInputStream();
- try {
- getContainingTask().getCancelables().registerClosable(is);
- ((StreamCheckpointedOperator) this).restoreState(is);
- } finally {
- getContainingTask().getCancelables().unregisterClosable(is);
- is.close();
+ FSDataInputStream is = state.openInputStream();
+ try {
+ getContainingTask().getCancelables().registerClosable(is);
+ ((StreamCheckpointedOperator) this).restoreState(is);
+ } finally {
+ getContainingTask().getCancelables().unregisterClosable(is);
+ is.close();
+ }
+ } else {
+ throw new Exception(
+ "Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c1f783f..1404958 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Migration;
import java.io.Serializable;
import java.util.ArrayList;
@@ -192,6 +193,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
} catch (Exception e) {
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
}
+ } else if (userFunction instanceof CheckpointedRestoring) {
+ out.write(0);
}
}
@@ -213,6 +216,11 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
}
}
}
+ } else if (in instanceof Migration) {
+ int hasUdfState = in.read();
+ if (hasUdfState == 1) {
+ throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");
+ }
}
}