You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 13:06:31 UTC
[3/3] flink git commit: [FLINK-5041] Savepoint backwards
compatibility 1.1 -> 1.2
[FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2
This addresses Savepoint, TaskState, StateHandels, KeyedStateBackends.
This closes #2781.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3bf837
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3bf837
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3bf837
Branch: refs/heads/master
Commit: af3bf837af766f93b1ab21e8e2f6f8fc5fdac6a6
Parents: a6e80da
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Nov 1 12:29:01 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 14:05:12 2016 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 96 ++++-
.../streaming/state/RocksDBStateBackend.java | 67 +++
flink-core/pom.xml | 2 +-
.../core/fs/AbstractMultiFSDataInputStream.java | 113 ++++++
.../memory/ByteArrayInputStreamWithPos.java | 118 ++++++
.../memory/ByteArrayOutputStreamWithPos.java | 220 ++--------
.../util/MigrationInstantiationUtil.java | 82 ++++
.../flink/migration/util/SerializedValue.java | 95 +++++
.../apache/flink/util/InstantiationUtil.java | 2 +-
.../apache/flink/migration/MigrationUtil.java | 34 ++
.../runtime/checkpoint/KeyGroupState.java | 84 ++++
.../runtime/checkpoint/SubtaskState.java | 104 +++++
.../migration/runtime/checkpoint/TaskState.java | 160 ++++++++
.../checkpoint/savepoint/SavepointV0.java | 98 +++++
.../savepoint/SavepointV0Serializer.java | 404 +++++++++++++++++++
.../runtime/state/AbstractCloseableHandle.java | 128 ++++++
.../runtime/state/AbstractStateBackend.java | 68 ++++
.../runtime/state/KvStateSnapshot.java | 28 ++
.../migration/runtime/state/StateHandle.java | 38 ++
.../migration/runtime/state/StateObject.java | 55 +++
.../runtime/state/StreamStateHandle.java | 37 ++
.../filesystem/AbstractFileStateHandle.java | 99 +++++
.../filesystem/AbstractFsStateSnapshot.java | 87 ++++
.../filesystem/FileSerializableStateHandle.java | 73 ++++
.../state/filesystem/FileStreamStateHandle.java | 84 ++++
.../state/filesystem/FsFoldingState.java | 40 ++
.../runtime/state/filesystem/FsListState.java | 42 ++
.../state/filesystem/FsReducingState.java | 40 ++
.../state/filesystem/FsStateBackend.java | 50 +++
.../runtime/state/filesystem/FsValueState.java | 40 ++
.../state/memory/AbstractMemStateSnapshot.java | 136 +++++++
.../state/memory/ByteStreamStateHandle.java | 85 ++++
.../runtime/state/memory/MemFoldingState.java | 38 ++
.../runtime/state/memory/MemListState.java | 41 ++
.../runtime/state/memory/MemReducingState.java | 45 +++
.../runtime/state/memory/MemValueState.java | 45 +++
.../state/memory/SerializedStateHandle.java | 92 +++++
.../state/MigrationKeyGroupStateHandle.java | 43 ++
.../state/MigrationStreamStateHandle.java | 56 +++
.../runtime/tasks/StreamTaskState.java | 85 ++++
.../runtime/tasks/StreamTaskStateList.java | 96 +++++
.../checkpoint/savepoint/SavepointLoader.java | 4 +-
.../savepoint/SavepointSerializer.java | 3 +-
.../savepoint/SavepointSerializers.java | 5 +-
.../checkpoint/savepoint/SavepointStore.java | 6 +-
.../savepoint/SavepointV1Serializer.java | 6 +-
.../state/AbstractKeyedStateBackend.java | 2 +-
.../runtime/state/MultiStreamStateHandle.java | 96 +++++
.../state/heap/HeapKeyedStateBackend.java | 143 ++++++-
.../flink/runtime/jobmanager/JobManager.scala | 18 +-
.../savepoint/MigrationV0ToV1Test.java | 249 ++++++++++++
.../savepoint/SavepointLoaderTest.java | 10 +-
.../savepoint/SavepointStoreTest.java | 10 +-
.../savepoint/SavepointV1SerializerTest.java | 5 +-
.../state/MultiStreamStateHandleTest.java | 135 +++++++
.../testingUtils/TestingJobManagerLike.scala | 6 +-
.../streaming/runtime/tasks/StreamTask.java | 2 +-
57 files changed, 3823 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bc5b17d..4db622d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -31,11 +31,15 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationUtil;
+import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -43,6 +47,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -62,8 +67,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -205,8 +212,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
try {
- RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
- restoreOperation.doRestore(restoreState);
+ if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
+ LOG.info("Converting RocksDB state from old savepoint.");
+ restoreOldSavepointKeyedState(restoreState);
+ } else {
+ RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+ restoreOperation.doRestore(restoreState);
+ }
} catch (Exception ex) {
dispose();
throw ex;
@@ -1068,4 +1080,84 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public File getInstanceBasePath() {
return instanceBasePath;
}
+
+ /**
+ * For backwards compatibility, remove again later!
+ */
+ @Deprecated
+ private void restoreOldSavepointKeyedState(Collection<KeyGroupsStateHandle> restoreState) throws Exception {
+
+ if (restoreState.isEmpty()) {
+ return;
+ }
+
+ Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
+ HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates =
+ InstantiationUtil.deserializeObject(restoreState.iterator().next().openInputStream(), userCodeClassLoader);
+
+ Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
+ DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
+
+ // clear k/v state information before filling it
+ kvStateInformation.clear();
+
+ // first get the column family mapping
+ int numColumns = inputView.readInt();
+ Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ byte mappingByte = inputView.readByte();
+
+ ObjectInputStream ooIn =
+ new InstantiationUtil.ClassLoaderObjectInputStream(
+ new DataInputViewStream(inputView), userCodeClassLoader);
+
+ StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+
+ columnFamilyMapping.put(mappingByte, stateDescriptor);
+
+ // this will fill in the k/v state information
+ getColumnFamily(stateDescriptor);
+ }
+
+ // try and read until EOF
+ try {
+ // the EOFException will get us out of this...
+ while (true) {
+ byte mappingByte = inputView.readByte();
+ ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+ byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+
+ ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
+
+ K reconstructedKey = keySerializer.deserialize(new DataInputViewStreamWrapper(bis));
+ int len = bis.getPosition();
+
+ int keyGroup = (byte) KeyGroupRangeAssignment.assignToKeyGroup(reconstructedKey, numberOfKeyGroups);
+
+ if (keyGroupPrefixBytes == 1) {
+ // copy and override one byte (42) between key and namespace
+ System.arraycopy(keyAndNamespace, 0, keyAndNamespace, 1, len);
+ keyAndNamespace[0] = (byte) keyGroup;
+ } else {
+ byte[] largerKey = new byte[1 + keyAndNamespace.length];
+
+ // write key-group
+ largerKey[0] = (byte) ((keyGroup >> 8) & 0xFF);
+ largerKey[1] = (byte) (keyGroup & 0xFF);
+
+ // write key
+ System.arraycopy(keyAndNamespace, 0, largerKey, 2, len);
+
+ //skip one byte (42), write namespace
+ System.arraycopy(keyAndNamespace, 1 + len, largerKey, 2 + len, keyAndNamespace.length - len - 1);
+ keyAndNamespace = largerKey;
+ }
+
+ byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+ db.put(handle, keyAndNamespace, value);
+ }
+ } catch (EOFException e) {
+ // expected
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
new file mode 100644
index 0000000..509eb4c
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+@Deprecated
+public class RocksDBStateBackend extends AbstractStateBackend {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base.
+ */
+ public static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>> {
+ private static final long serialVersionUID = 1L;
+
+ public final StateHandle<DataInputView> stateHandle;
+ final long checkpointId;
+
+ /**
+ * Creates a new snapshot from the given state parameters.
+ */
+ private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
+ this.stateHandle = requireNonNull(stateHandle);
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public final void discardState() throws Exception {
+ stateHandle.discardState();
+ }
+
+ @Override
+ public final long getStateSize() throws Exception {
+ return stateHandle.getStateSize();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stateHandle.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index cfa2cbb..ffbfe70 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -112,7 +112,7 @@ under the License.
<artifactId>joda-convert</artifactId>
<scope>test</scope>
</dependency>
- </dependencies>
+ </dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
new file mode 100644
index 0000000..88c0092
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Abstract base class for wrappers over multiple {@link FSDataInputStream}, which gives a contiguous view on all inner
+ * streams and makes them look like a single stream, in which we can read, seek, etc.
+ */
+public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
+
+ /** Inner stream for the currently accessed segment of the virtual global stream */
+ protected FSDataInputStream delegate;
+
+ /** Position in the virtual global stream */
+ protected long totalPos;
+
+ /** Total available bytes in the virtual global stream */
+ protected long totalAvailable;
+
+ public AbstractMultiFSDataInputStream() {
+ this.totalPos = 0L;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+
+ if(desired == totalPos) {
+ return;
+ }
+
+ Preconditions.checkArgument(desired >= 0L);
+
+ if (desired > totalAvailable) {
+ throw new EOFException();
+ }
+
+ IOUtils.closeQuietly(delegate);
+ delegate = getSeekedStreamForOffset(desired);
+
+ this.totalPos = desired;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return totalPos;
+ }
+
+ @Override
+ public int read() throws IOException {
+
+ if (null == delegate) {
+ return -1;
+ }
+
+ int val = delegate.read();
+
+ if (-1 == val) {
+ IOUtils.closeQuietly(delegate);
+ if (totalPos < totalAvailable) {
+ delegate = getSeekedStreamForOffset(totalPos);
+ } else {
+ delegate = null;
+ }
+ return read();
+ }
+
+ ++totalPos;
+ return val;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(delegate);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ seek(totalPos + n);
+ return n;
+ }
+
+ /**
+ * Delivers a the right stream for the given global stream offset. The returned stream is already seeked to the
+ * right local offset that correctly reflects the global offset.
+ *
+ * @param globalStreamOffset the global offset to which we seek
+ * @return a sub-stream, seeked to the correct local offset w.r.t. the global offset.
+ * @throws IOException
+ */
+ protected abstract FSDataInputStream getSeekedStreamForOffset(long globalStreamOffset) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
new file mode 100644
index 0000000..c25f491
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Un-synchronized stream similar to Java's ByteArrayInputStream that also exposes the current position.
+ */
+public class ByteArrayInputStreamWithPos extends InputStream {
+
+ protected byte[] buffer;
+ protected int position;
+ protected int count;
+ protected int mark = 0;
+
+ public ByteArrayInputStreamWithPos(byte[] buffer) {
+ this(buffer, 0, buffer.length);
+ }
+
+ public ByteArrayInputStreamWithPos(byte[] buffer, int offset, int length) {
+ this.position = offset;
+ this.buffer = buffer;
+ this.mark = offset;
+ this.count = Math.min(buffer.length, offset + length);
+ }
+
+ @Override
+ public int read() {
+ return (position < count) ? 0xFF & (buffer[position++]) : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ Preconditions.checkNotNull(b);
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (position >= count) {
+ return -1; // signal EOF
+ }
+
+ int available = count - position;
+
+ if (len > available) {
+ len = available;
+ }
+
+ if (len <= 0) {
+ return 0;
+ }
+
+ System.arraycopy(buffer, position, b, off, len);
+ position += len;
+ return len;
+ }
+
+ @Override
+ public long skip(long toSkip) {
+ long remain = count - position;
+
+ if (toSkip < remain) {
+ remain = toSkip < 0 ? 0 : toSkip;
+ }
+
+ position += remain;
+ return remain;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void mark(int readAheadLimit) {
+ mark = position;
+ }
+
+ @Override
+ public void reset() {
+ position = mark;
+ }
+
+ @Override
+ public int available() {
+ return count - position;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ public int getPosition() {
+ return position;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 285e016..df5b34a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,260 +22,96 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.util.Arrays;
/**
- * Un-synchronized copy of Java's ByteArrayOutputStream that also exposes the current position.
+ * Un-synchronized stream similar to Java's ByteArrayOutputStream that also exposes the current position.
*/
public class ByteArrayOutputStreamWithPos extends OutputStream {
- /**
- * The buffer where data is stored.
- */
- protected byte[] buf;
-
- /**
- * The number of valid bytes in the buffer.
- */
+ protected byte[] buffer;
protected int count;
- /**
- * Creates a new byte array output stream. The buffer capacity is
- * initially 32 bytes, though its size increases if necessary.
- */
public ByteArrayOutputStreamWithPos() {
- this(32);
+ this(64);
}
- /**
- * Creates a new byte array output stream, with a buffer capacity of
- * the specified size, in bytes.
- *
- * @param size the initial size.
- * @throws IllegalArgumentException if size is negative.
- */
public ByteArrayOutputStreamWithPos(int size) {
- if (size < 0) {
- throw new IllegalArgumentException("Negative initial size: "
- + size);
- }
- buf = new byte[size];
+ Preconditions.checkArgument(size >= 0);
+ buffer = new byte[size];
}
- /**
- * Increases the capacity if necessary to ensure that it can hold
- * at least the number of elements specified by the minimum
- * capacity argument.
- *
- * @param minCapacity the desired minimum capacity
- * @throws OutOfMemoryError if {@code minCapacity < 0}. This is
- * interpreted as a request for the unsatisfiably large capacity
- * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
- */
- private void ensureCapacity(int minCapacity) {
- // overflow-conscious code
- if (minCapacity - buf.length > 0) {
- grow(minCapacity);
+ private void ensureCapacity(int requiredCapacity) {
+ if (requiredCapacity - buffer.length > 0) {
+ increaseCapacity(requiredCapacity);
}
}
- /**
- * Increases the capacity to ensure that it can hold at least the
- * number of elements specified by the minimum capacity argument.
- *
- * @param minCapacity the desired minimum capacity
- */
- private void grow(int minCapacity) {
- // overflow-conscious code
- int oldCapacity = buf.length;
+ private void increaseCapacity(int requiredCapacity) {
+ int oldCapacity = buffer.length;
int newCapacity = oldCapacity << 1;
- if (newCapacity - minCapacity < 0) {
- newCapacity = minCapacity;
+ if (newCapacity - requiredCapacity < 0) {
+ newCapacity = requiredCapacity;
}
if (newCapacity < 0) {
- if (minCapacity < 0) { // overflow
+ if (requiredCapacity < 0) {
throw new OutOfMemoryError();
}
newCapacity = Integer.MAX_VALUE;
}
- buf = Arrays.copyOf(buf, newCapacity);
+ buffer = Arrays.copyOf(buffer, newCapacity);
}
- /**
- * Writes the specified byte to this byte array output stream.
- *
- * @param b the byte to be written.
- */
+ @Override
public void write(int b) {
ensureCapacity(count + 1);
- buf[count] = (byte) b;
- count += 1;
+ buffer[count] = (byte) b;
+ ++count;
}
- /**
- * Writes <code>len</code> bytes from the specified byte array
- * starting at offset <code>off</code> to this byte array output stream.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- */
+ @Override
public void write(byte[] b, int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) ||
+ if ((off < 0) || (len < 0) || (off > b.length) ||
((off + len) - b.length > 0)) {
throw new IndexOutOfBoundsException();
}
+
ensureCapacity(count + len);
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
- /**
- * Writes the complete contents of this byte array output stream to
- * the specified output stream argument, as if by calling the output
- * stream's write method using <code>out.write(buf, 0, count)</code>.
- *
- * @param out the output stream to which to write the data.
- * @throws IOException if an I/O error occurs.
- */
- public void writeTo(OutputStream out) throws IOException {
- out.write(buf, 0, count);
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
}
- /**
- * Resets the <code>count</code> field of this byte array output
- * stream to zero, so that all currently accumulated output in the
- * output stream is discarded. The output stream can be used again,
- * reusing the already allocated buffer space.
- *
- * @see java.io.ByteArrayInputStream#count
- */
public void reset() {
count = 0;
}
- /**
- * Creates a newly allocated byte array. Its size is the current
- * size of this output stream and the valid contents of the buffer
- * have been copied into it.
- *
- * @return the current contents of this output stream, as a byte array.
- * @see java.io.ByteArrayOutputStream#size()
- */
public byte toByteArray()[] {
- return Arrays.copyOf(buf, count);
+ return Arrays.copyOf(buffer, count);
}
- /**
- * Returns the current size of the buffer.
- *
- * @return the value of the <code>count</code> field, which is the number
- * of valid bytes in this output stream.
- * @see java.io.ByteArrayOutputStream#count
- */
public int size() {
return count;
}
- /**
- * Converts the buffer's contents into a string decoding bytes using the
- * platform's default character set. The length of the new <tt>String</tt>
- * is a function of the character set, and hence may not be equal to the
- * size of the buffer.
- * <p>
- * <p> This method always replaces malformed-input and unmappable-character
- * sequences with the default replacement string for the platform's
- * default character set. The {@linkplain java.nio.charset.CharsetDecoder}
- * class should be used when more control over the decoding process is
- * required.
- *
- * @return String decoded from the buffer's contents.
- * @since JDK1.1
- */
public String toString() {
- return new String(buf, 0, count);
+ return new String(buffer, 0, count);
}
- /**
- * Converts the buffer's contents into a string by decoding the bytes using
- * the named {@link java.nio.charset.Charset charset}. The length of the new
- * <tt>String</tt> is a function of the charset, and hence may not be equal
- * to the length of the byte array.
- * <p>
- * <p> This method always replaces malformed-input and unmappable-character
- * sequences with this charset's default replacement string. The {@link
- * java.nio.charset.CharsetDecoder} class should be used when more control
- * over the decoding process is required.
- *
- * @param charsetName the name of a supported
- * {@link java.nio.charset.Charset charset}
- * @return String decoded from the buffer's contents.
- * @throws UnsupportedEncodingException If the named charset is not supported
- * @since JDK1.1
- */
- public String toString(String charsetName)
- throws UnsupportedEncodingException {
- return new String(buf, 0, count, charsetName);
+ private int getEndPosition() {
+ return buffer.length;
}
- /**
- * Creates a newly allocated string. Its size is the current size of
- * the output stream and the valid contents of the buffer have been
- * copied into it. Each character <i>c</i> in the resulting string is
- * constructed from the corresponding element <i>b</i> in the byte
- * array such that:
- * <blockquote><pre>
- * c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))
- * </pre></blockquote>
- *
- * @param hibyte the high byte of each resulting Unicode character.
- * @return the current contents of the output stream, as a string.
- * @see java.io.ByteArrayOutputStream#size()
- * @see java.io.ByteArrayOutputStream#toString(String)
- * @see java.io.ByteArrayOutputStream#toString()
- * @deprecated This method does not properly convert bytes into characters.
- * As of JDK 1.1, the preferred way to do this is via the
- * <code>toString(String enc)</code> method, which takes an encoding-name
- * argument, or the <code>toString()</code> method, which uses the
- * platform's default character encoding.
- */
- @Deprecated
- public String toString(int hibyte) {
- return new String(buf, hibyte, 0, count);
- }
-
- /**
- * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
- * this class can be called after the stream has been closed without
- * generating an <tt>IOException</tt>.
- */
- public void close() throws IOException {
- }
-
- /**
- * Returns the read/write offset position for the stream.
- * @return the current position in the stream.
- */
public int getPosition() {
return count;
}
- /**
- * Sets the read/write offset position for the stream.
- *
- * @param position the position to which the offset in the stream shall be set. Must be < getEndPosition
- */
public void setPosition(int position) {
Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds.");
count = position;
}
- /**
- * Returns the size of the internal buffer, which is the current end position for all setPosition calls.
- * @return size of the internal buffer
- */
- public int getEndPosition() {
- return buf.length;
+ @Override
+ public void close() throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
new file mode 100644
index 0000000..b6c354e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.util;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Utility class to deserialize legacy classes for migration.
+ */
+public final class MigrationInstantiationUtil {
+
+ public static class ClassLoaderObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+
+ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+ super(in, classLoader);
+ }
+
+ @Override
+ protected ObjectStreamClass readClassDescriptor()
+ throws IOException, ClassNotFoundException {
+ ObjectStreamClass objectStreamClass = super.readClassDescriptor();
+ String className = objectStreamClass.getName();
+ if (className.contains("apache.flink.")) {
+ className = className.replace("apache.flink.", "apache.flink.migration.");
+ try {
+ Class<?> clazz = Class.forName(className, false, classLoader);
+ objectStreamClass = ObjectStreamClass.lookup(clazz);
+ } catch (Exception ignored) {
+
+ }
+ }
+ return objectStreamClass;
+ }
+ }
+
+ public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
+ return deserializeObject(new ByteArrayInputStream(bytes), cl);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T deserializeObject(InputStream in, ClassLoader cl) throws IOException, ClassNotFoundException {
+ final ClassLoader old = Thread.currentThread().getContextClassLoader();
+ try (ObjectInputStream oois = new ClassLoaderObjectInputStream(in, cl)) {
+ Thread.currentThread().setContextClassLoader(cl);
+ return (T) oois.readObject();
+ } finally {
+ Thread.currentThread().setContextClassLoader(old);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private MigrationInstantiationUtil() {
+ throw new IllegalAccessError();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
new file mode 100644
index 0000000..aab68c9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/SerializedValue.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.util;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class is used to transfer (via serialization) objects whose classes are not available
+ * in the system class loader. When those objects are deserialized without access to their
+ * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
+ *
+ * To work around that issue, the SerializedValue serialized data immediately into a byte array.
+ * When send through RPC or another service that uses serialization, only the byte array is
+ * transferred. The object is deserialized later (upon access) and requires the accessor to
+ * provide the corresponding class loader.
+ *
+ * @param <T> The type of the value held.
+ */
+@Deprecated
+public class SerializedValue<T> implements java.io.Serializable {
+
+ private static final long serialVersionUID = -3564011643393683761L;
+
+ /** The serialized data */
+ private final byte[] serializedData;
+
+ private SerializedValue(byte[] serializedData) {
+ this.serializedData = serializedData;
+ }
+
+ public SerializedValue(T value) throws IOException {
+ this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
+ return serializedData == null ? null : (T) MigrationInstantiationUtil.deserializeObject(serializedData, loader);
+ }
+
+ /**
+ * Returns the serialized value or <code>null</code> if no value is set.
+ *
+ * @return Serialized data.
+ */
+ public byte[] getByteArray() {
+ return serializedData;
+ }
+
+ public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
+ return new SerializedValue<T>(serializedData);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SerializedValue) {
+ SerializedValue<?> other = (SerializedValue<?>) obj;
+ return this.serializedData == null ? other.serializedData == null :
+ (other.serializedData != null && Arrays.equals(this.serializedData, other.serializedData));
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SerializedValue";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index cd5c91a..ffb5a7d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -51,7 +51,7 @@ public final class InstantiationUtil {
*/
public static class ClassLoaderObjectInputStream extends ObjectInputStream {
- private final ClassLoader classLoader;
+ protected final ClassLoader classLoader;
public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
super(in);
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
new file mode 100644
index 0000000..76d4eef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration;
+
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+
+import java.util.Collection;
+
+public class MigrationUtil {
+
+ public static boolean isOldSavepointKeyedState(Collection<KeyGroupsStateHandle> keyGroupsStateHandles) {
+ return (keyGroupsStateHandles != null)
+ && (keyGroupsStateHandles.size() == 1)
+ && (keyGroupsStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
new file mode 100644
index 0000000..ad94993
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/KeyGroupState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Simple container class which contains the serialized state handle for a key group.
+ *
+ * The key group state handle is kept in serialized form because it can contain user code classes
+ * which might not be available on the JobManager.
+ */
+@Deprecated
+public class KeyGroupState implements Serializable {
+ private static final long serialVersionUID = -5926696455438467634L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KeyGroupState.class);
+
+ private final SerializedValue<StateHandle<?>> keyGroupState;
+
+ private final long stateSize;
+
+ private final long duration;
+
+ public KeyGroupState(SerializedValue<StateHandle<?>> keyGroupState, long stateSize, long duration) {
+ this.keyGroupState = keyGroupState;
+
+ this.stateSize = stateSize;
+
+ this.duration = duration;
+ }
+
+ public SerializedValue<StateHandle<?>> getKeyGroupState() {
+ return keyGroupState;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof KeyGroupState) {
+ KeyGroupState other = (KeyGroupState) obj;
+
+ return keyGroupState.equals(other.keyGroupState) && stateSize == other.stateSize &&
+ duration == other.duration;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (this.stateSize ^ this.stateSize >>> 32) +
+ 31 * ((int) (this.duration ^ this.duration >>> 32) +
+ 31 * keyGroupState.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
new file mode 100644
index 0000000..df886b3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/SubtaskState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Deprecated
+public class SubtaskState implements Serializable {
+
+ private static final long serialVersionUID = -2394696997971923995L;
+
+ /** The state of the parallel operator */
+ private final SerializedValue<StateHandle<?>> state;
+
+ /**
+ * The state size. This is also part of the deserialized state handle.
+ * We store it here in order to not deserialize the state handle when
+ * gathering stats.
+ */
+ private final long stateSize;
+
+ /** The duration of the acknowledged (ack timestamp - trigger timestamp). */
+ private final long duration;
+
+ public SubtaskState(
+ SerializedValue<StateHandle<?>> state,
+ long stateSize,
+ long duration) {
+
+ this.state = checkNotNull(state, "State");
+ // Sanity check and don't fail checkpoint because of this.
+ this.stateSize = stateSize >= 0 ? stateSize : 0;
+
+ this.duration = duration;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public SerializedValue<StateHandle<?>> getState() {
+ return state;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void discard(ClassLoader userClassLoader) throws Exception {
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof SubtaskState) {
+ SubtaskState that = (SubtaskState) o;
+ return this.state.equals(that.state) && stateSize == that.stateSize &&
+ duration == that.duration;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (this.stateSize ^ this.stateSize >>> 32) +
+ 31 * ((int) (this.duration ^ this.duration >>> 32) +
+ 31 * state.hashCode());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
new file mode 100644
index 0000000..798c112
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/TaskState.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint;
+
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+@Deprecated
+public class TaskState implements Serializable {
+
+ private static final long serialVersionUID = -4845578005863201810L;
+
+ private final JobVertexID jobVertexID;
+
+ /** Map of task states which can be accessed by their sub task index */
+ private final Map<Integer, SubtaskState> subtaskStates;
+
+ /** Map of key-value states which can be accessed by their key group index */
+ private final Map<Integer, KeyGroupState> kvStates;
+
+ /** Parallelism of the operator when it was checkpointed */
+ private final int parallelism;
+
+ public TaskState(JobVertexID jobVertexID, int parallelism) {
+ this.jobVertexID = jobVertexID;
+
+ this.subtaskStates = new HashMap<>(parallelism);
+
+ this.kvStates = new HashMap<>();
+
+ this.parallelism = parallelism;
+ }
+
+ public JobVertexID getJobVertexID() {
+ return jobVertexID;
+ }
+
+ public void putState(int subtaskIndex, SubtaskState subtaskState) {
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ } else {
+ subtaskStates.put(subtaskIndex, subtaskState);
+ }
+ }
+
+ public SubtaskState getState(int subtaskIndex) {
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ } else {
+ return subtaskStates.get(subtaskIndex);
+ }
+ }
+
+ public Collection<SubtaskState> getStates() {
+ return subtaskStates.values();
+ }
+
+ public Map<Integer, SubtaskState> getSubtaskStatesById() {
+ return subtaskStates;
+ }
+
+ public long getStateSize() {
+ long result = 0L;
+
+ for (SubtaskState subtaskState : subtaskStates.values()) {
+ result += subtaskState.getStateSize();
+ }
+
+ for (KeyGroupState keyGroupState : kvStates.values()) {
+ result += keyGroupState.getStateSize();
+ }
+
+ return result;
+ }
+
+ public int getNumberCollectedStates() {
+ return subtaskStates.size();
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
+ kvStates.put(keyGroupId, keyGroupState);
+ }
+
+ public KeyGroupState getKvState(int keyGroupId) {
+ return kvStates.get(keyGroupId);
+ }
+
+ /**
+ * Retrieve the set of key-value state key groups specified by the given key group partition set.
+ * The key groups are returned as a map where the key group index maps to the serialized state
+ * handle of the key group.
+ *
+ * @param keyGroupPartition Set of key group indices
+ * @return Map of serialized key group state handles indexed by their key group index.
+ */
+ public Map<Integer, SerializedValue<StateHandle<?>>> getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
+ HashMap<Integer, SerializedValue<StateHandle<?>>> result = new HashMap<>(keyGroupPartition.size());
+
+ for (Integer keyGroupId : keyGroupPartition) {
+ KeyGroupState keyGroupState = kvStates.get(keyGroupId);
+
+ if (keyGroupState != null) {
+ result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState());
+ }
+ }
+
+ return result;
+ }
+
+ public int getNumberCollectedKvStates() {
+ return kvStates.size();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TaskState) {
+ TaskState other = (TaskState) obj;
+
+ return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
+ subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
new file mode 100644
index 0000000..8aa562e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Savepoint version 0.
+ *
+ * <p>This format was introduced with Flink 1.1.0.
+ */
+public class SavepointV0 implements Savepoint {
+
+ /** The savepoint version. */
+ public static final int VERSION = 0;
+
+ /** The checkpoint ID */
+ private final long checkpointId;
+
+ /** The task states */
+ private final Collection<TaskState> taskStates;
+
+ public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
+ this.checkpointId = checkpointId;
+ this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ @Override
+ public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ //NOP
+ }
+
+
+ public Collection<TaskState> getOldTaskStates() {
+ return taskStates;
+ }
+
+ @Override
+ public String toString() {
+ return "Savepoint(version=" + VERSION + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SavepointV0 that = (SavepointV0) o;
+ return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (checkpointId ^ (checkpointId >>> 32));
+ result = 31 * result + taskStates.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
new file mode 100644
index 0000000..e4125e5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
+import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.migration.state.MigrationStreamStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.MultiStreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+ private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
+ private static final StreamStateHandle SIGNAL_1 = new ByteStreamStateHandle("SIGNAL_1", new byte[]{1});
+
+ private static final int MAX_SIZE = 4 * 1024 * 1024;
+
+ private SavepointV0Serializer() {
+ }
+
+
+ @Override
+ public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+ throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
+ }
+
+ @Override
+ public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
+
+ long checkpointId = dis.readLong();
+
+ // Task states
+ int numTaskStates = dis.readInt();
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+ int parallelism = dis.readInt();
+
+ // Add task state
+ TaskState taskState = new TaskState(jobVertexId, parallelism);
+ taskStates.add(taskState);
+
+ // Sub task states
+ int numSubTaskStates = dis.readInt();
+ for (int j = 0; j < numSubTaskStates; j++) {
+ int subtaskIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new byte[length];
+ dis.readFully(serializedData, 0, length);
+ serializedValue = SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ SubtaskState subtaskState = new SubtaskState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putState(subtaskIndex, subtaskState);
+ }
+
+ // Key group states
+ int numKvStates = dis.readInt();
+ for (int j = 0; j < numKvStates; j++) {
+ int keyGroupIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new byte[length];
+ dis.readFully(serializedData, 0, length);
+ serializedValue = SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ KeyGroupState keyGroupState = new KeyGroupState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putKvState(keyGroupIndex, keyGroupState);
+ }
+ }
+
+ try {
+ return convertSavepoint(taskStates, userClassLoader, checkpointId);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private SavepointV1 convertSavepoint(
+ List<TaskState> taskStates,
+ ClassLoader userClassLoader,
+ long checkpointID) throws Exception {
+
+ List<org.apache.flink.runtime.checkpoint.TaskState> newTaskStates = new ArrayList<>(taskStates.size());
+
+ for (TaskState taskState : taskStates) {
+ newTaskStates.add(convertTaskState(taskState, userClassLoader, checkpointID));
+ }
+
+ return new SavepointV1(checkpointID, newTaskStates);
+ }
+
+ private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(
+ TaskState taskState,
+ ClassLoader userClassLoader,
+ long checkpointID) throws Exception {
+
+ JobVertexID jobVertexID = taskState.getJobVertexID();
+ int parallelism = taskState.getParallelism();
+ int chainLength = determineOperatorChainLength(taskState, userClassLoader);
+
+ org.apache.flink.runtime.checkpoint.TaskState newTaskState =
+ new org.apache.flink.runtime.checkpoint.TaskState(
+ jobVertexID,
+ parallelism,
+ parallelism,
+ chainLength);
+
+ if (chainLength > 0) {
+
+ Map<Integer, SubtaskState> subtaskStates = taskState.getSubtaskStatesById();
+
+ for (Map.Entry<Integer, SubtaskState> subtaskState : subtaskStates.entrySet()) {
+ int parallelInstanceIdx = subtaskState.getKey();
+ newTaskState.putState(parallelInstanceIdx, convertSubtaskState(
+ subtaskState.getValue(),
+ parallelInstanceIdx,
+ userClassLoader,
+ checkpointID));
+ }
+ }
+
+ return newTaskState;
+ }
+
+ private org.apache.flink.runtime.checkpoint.SubtaskState convertSubtaskState(
+ SubtaskState subtaskState,
+ int parallelInstanceIdx,
+ ClassLoader userClassLoader,
+ long checkpointID) throws Exception {
+
+ SerializedValue<StateHandle<?>> serializedValue = subtaskState.getState();
+
+ StreamTaskStateList stateList = (StreamTaskStateList) serializedValue.deserializeValue(userClassLoader);
+ StreamTaskState[] streamTaskStates = stateList.getState(userClassLoader);
+
+ List<StreamStateHandle> newChainStateList = Arrays.asList(new StreamStateHandle[streamTaskStates.length]);
+ KeyGroupsStateHandle newKeyedState = null;
+
+ for (int chainIdx = 0; chainIdx < streamTaskStates.length; ++chainIdx) {
+
+ StreamTaskState streamTaskState = streamTaskStates[chainIdx];
+ if (streamTaskState == null) {
+ continue;
+ }
+
+ newChainStateList.set(chainIdx, convertOperatorAndFunctionState(streamTaskState));
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState = streamTaskState.getKvStates();
+
+ if (null != oldKeyedState) {
+ Preconditions.checkState(null == newKeyedState, "Found more than one keyed state in chain");
+ newKeyedState = convertKeyedBackendState(oldKeyedState, parallelInstanceIdx, checkpointID);
+ }
+ }
+
+ ChainedStateHandle<StreamStateHandle> newChainedState = new ChainedStateHandle<>(newChainStateList);
+ ChainedStateHandle<OperatorStateHandle> nopChain =
+ new ChainedStateHandle<>(Arrays.asList(new OperatorStateHandle[newChainedState.getLength()]));
+
+ return new org.apache.flink.runtime.checkpoint.SubtaskState(
+ newChainedState,
+ nopChain,
+ nopChain,
+ newKeyedState,
+ null);
+ }
+
+ private StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
+
+ List<StreamStateHandle> mergeStateHandles = new ArrayList<>(4);
+
+ StateHandle<Serializable> functionState = streamTaskState.getFunctionState();
+ StateHandle<?> operatorState = streamTaskState.getOperatorState();
+
+ if (null != functionState) {
+ mergeStateHandles.add(SIGNAL_1);
+ mergeStateHandles.add(convertStateHandle(functionState));
+ } else {
+ mergeStateHandles.add(SIGNAL_0);
+ }
+
+ if (null != operatorState) {
+ mergeStateHandles.add(SIGNAL_1);
+ mergeStateHandles.add(convertStateHandle(operatorState));
+ } else {
+ mergeStateHandles.add(SIGNAL_0);
+ }
+
+ return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
+ }
+
+ private KeyGroupsStateHandle convertKeyedBackendState(
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState,
+ int parallelInstanceIdx,
+ long checkpointID) throws Exception {
+
+ if (null != oldKeyedState) {
+
+ CheckpointStreamFactory checkpointStreamFactory = new MemCheckpointStreamFactory(MAX_SIZE);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
+ checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
+
+ final long offset = keyedStateOut.getPos();
+
+ InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
+ StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+
+ if (null != streamStateHandle) {
+ KeyGroupRangeOffsets keyGroupRangeOffsets =
+ new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
+
+ return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+ }
+ }
+ return null;
+ }
+
+ private int determineOperatorChainLength(
+ TaskState taskState,
+ ClassLoader userClassLoader) throws IOException, ClassNotFoundException {
+
+ Collection<SubtaskState> subtaskStates = taskState.getStates();
+
+ if (subtaskStates == null || subtaskStates.isEmpty()) {
+ return 0;
+ }
+
+ SubtaskState firstSubtaskState = subtaskStates.iterator().next();
+ Object toCastTaskStateList = firstSubtaskState.getState().deserializeValue(userClassLoader);
+
+ if (toCastTaskStateList instanceof StreamTaskStateList) {
+ StreamTaskStateList taskStateList = (StreamTaskStateList) toCastTaskStateList;
+ StreamTaskState[] streamTaskStates = taskStateList.getState(userClassLoader);
+
+ return streamTaskStates.length;
+ }
+ return 0;
+ }
+
+ private static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
+ if (oldStateHandle instanceof AbstractFileStateHandle) {
+ Path path = ((AbstractFileStateHandle) oldStateHandle).getFilePath();
+ return new FileStateHandle(path, oldStateHandle.getStateSize());
+ } else if (oldStateHandle instanceof SerializedStateHandle) {
+ byte[] data = ((SerializedStateHandle<?>) oldStateHandle).getSerializedData();
+ return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+ } else if (oldStateHandle instanceof org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) {
+ byte[] data =
+ ((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
+ return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+ }
+ throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
+ }
+
+ @VisibleForTesting
+ public void serializeOld(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
+ dos.writeLong(savepoint.getCheckpointId());
+
+ Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = savepoint.getOldTaskStates();
+ dos.writeInt(taskStates.size());
+
+ for (org.apache.flink.migration.runtime.checkpoint.TaskState taskState : savepoint.getOldTaskStates()) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+
+ // Sub task states
+ dos.writeInt(taskState.getNumberCollectedStates());
+
+ for (int i = 0; i < parallelism; i++) {
+ SubtaskState subtaskState = taskState.getState(i);
+
+ if (subtaskState != null) {
+ dos.writeInt(i);
+
+ SerializedValue<?> serializedValue = subtaskState.getState();
+ if (serializedValue == null) {
+ dos.writeInt(-1); // null
+ } else {
+ byte[] serialized = serializedValue.getByteArray();
+ dos.writeInt(serialized.length);
+ dos.write(serialized, 0, serialized.length);
+ }
+
+ dos.writeLong(subtaskState.getStateSize());
+ dos.writeLong(subtaskState.getDuration());
+ }
+ }
+
+ // Key group states
+ dos.writeInt(taskState.getNumberCollectedKvStates());
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyGroupState keyGroupState = taskState.getKvState(i);
+
+ if (keyGroupState != null) {
+ dos.write(i);
+
+ SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
+ if (serializedValue == null) {
+ dos.writeInt(-1); // null
+ } else {
+ byte[] serialized = serializedValue.getByteArray();
+ dos.writeInt(serialized.length);
+ dos.write(serialized, 0, serialized.length);
+ }
+
+ dos.writeLong(keyGroupState.getStateSize());
+ dos.writeLong(keyGroupState.getDuration());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
new file mode 100644
index 0000000..873dab8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractCloseableHandle.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * A simple base for closable handles.
+ *
+ * Offers to register a stream (or other closable object) that close calls are delegated to if
+ * the handle is closed or was already closed.
+ */
+@Deprecated
+public abstract class AbstractCloseableHandle implements Closeable, Serializable {
+
+ /** Serial Version UID must be constant to maintain format compatibility */
+ private static final long serialVersionUID = 1L;
+
+ /** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
+ private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
+
+ // ------------------------------------------------------------------------
+
+ /** The closeable to close if this handle is closed late */
+ private transient volatile Closeable toClose;
+
+ /** Flag to remember if this handle was already closed */
+ @SuppressWarnings("unused") // this field is actually updated, but via the "CLOSER" updater
+ private transient volatile int isClosed;
+
+ // ------------------------------------------------------------------------
+
+ protected final void registerCloseable(Closeable toClose) throws IOException {
+ if (toClose == null) {
+ return;
+ }
+
+ // NOTE: The order of operations matters here:
+ // (1) first setting the closeable
+ // (2) checking the flag.
+ // Because the order in the {@link #close()} method is the opposite, and
+ // both variables are volatile (reordering barriers), we can be sure that
+ // one of the methods always notices the effect of a concurrent call to the
+ // other method.
+
+ this.toClose = toClose;
+
+ // check if we were closed early
+ if (this.isClosed != 0) {
+ toClose.close();
+ throw new IOException("handle is closed");
+ }
+ }
+
+ /**
+ * Closes the handle.
+ *
+ * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
+ * then this will be closes.
+ *
+ * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
+ * it will immediately be closed and that method will throw an exception.
+ *
+ * @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
+ * are forwarded.
+ *
+ * @see #registerCloseable(Closeable)
+ */
+ @Override
+ public final void close() throws IOException {
+ // NOTE: The order of operations matters here:
+ // (1) first setting the closed flag
+ // (2) checking whether there is already a closeable
+ // Because the order in the {@link #registerCloseable(Closeable)} method is the opposite, and
+ // both variables are volatile (reordering barriers), we can be sure that
+ // one of the methods always notices the effect of a concurrent call to the
+ // other method.
+
+ if (CLOSER.compareAndSet(this, 0, 1)) {
+ final Closeable toClose = this.toClose;
+ if (toClose != null) {
+ this.toClose = null;
+ toClose.close();
+ }
+ }
+ }
+
+ /**
+ * Checks whether this handle has been closed.
+ *
+ * @return True is the handle is closed, false otherwise.
+ */
+ public boolean isClosed() {
+ return isClosed != 0;
+ }
+
+ /**
+ * This method checks whether the handle is closed and throws an exception if it is closed.
+ * If the handle is not closed, this method does nothing.
+ *
+ * @throws IOException Thrown, if the handle has been closed.
+ */
+ public void ensureNotClosed() throws IOException {
+ if (isClosed != 0) {
+ throw new IOException("handle is closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
new file mode 100644
index 0000000..b7932f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A state backend defines how state is stored and snapshotted during checkpoints.
+ */
+@Deprecated
+public abstract class AbstractStateBackend implements Serializable {
+
+ private static final long serialVersionUID = 4620413814639220247L;
+
+ /**
+ * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
+ */
+ private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+
+ private static final long serialVersionUID = 2891559813513532079L;
+
+ private final StreamStateHandle stream;
+
+ private DataInputViewHandle(StreamStateHandle stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
+ return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ return stream.getStateSize();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
new file mode 100644
index 0000000..9936ca7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+
+@Deprecated
+public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
+ extends StateObject {
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
new file mode 100644
index 0000000..97d6984
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateHandle.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+/**
+ * StateHandle is a general handle interface meant to abstract operator state fetching.
+ * A StateHandle implementation can for example include the state itself in cases where the state
+ * is lightweight or fetching it lazily from some external storage when the state is too large.
+ */
+@Deprecated
+public interface StateHandle<T> extends StateObject {
+
+ /**
+ * This retrieves and return the state represented by the handle.
+ *
+ * @param userCodeClassLoader Class loader for deserializing user code specific classes
+ *
+ * @return The state represented by the handle.
+ * @throws Exception Thrown, if the state cannot be fetched.
+ */
+ T getState(ClassLoader userCodeClassLoader) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
new file mode 100644
index 0000000..2f1048f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/StateObject.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.state;
+
+/**
+ * Base of all types that represent checkpointed state. Specializations are for
+ * example {@link StateHandle StateHandles} (directly resolve to state) and
+ * {@link KvStateSnapshot key/value state snapshots}.
+ *
+ * <p>State objects define how to:
+ * <ul>
+ * <li><b>Discard State</b>: The {@link #discardState()} method defines how state is permanently
+ * disposed/deleted. After that method call, state may not be recoverable any more.</li>
+
+ * <li><b>Close the current state access</b>: The {@link #close()} method defines how to
+ * stop the current access or recovery to the state. Called for example when an operation is
+ * canceled during recovery.</li>
+ * </ul>
+ */
+@Deprecated
+public interface StateObject extends java.io.Closeable, java.io.Serializable {
+
+ /**
+ * Discards the state referred to by this handle, to free up resources in
+ * the persistent storage. This method is called when the handle will not be
+ * used any more.
+ */
+ void discardState() throws Exception;
+
+ /**
+ * Returns the size of the state in bytes.
+ *
+ * <p>If the the size is not known, return {@code 0}.
+ *
+ * @return Size of the state in bytes.
+ * @throws Exception If the operation fails during size retrieval.
+ */
+ long getStateSize() throws Exception;
+}