You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:30 UTC
[2/7] flink git commit: [FLINK-5763] [checkpoints] Followup on adding
CheckpointOptions
[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions
- Add a test that validates the checkpoint type ordinals are not changed
- Change target location writing from 'writeUtf' to 'StringUtils.write'.
- Pull out the coding charset as a constant in 'EventSerializer'
- Simplify the directory creation in 'SavepointStore'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b
Branch: refs/heads/master
Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5
Parents: 6e7a917
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 14:11:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../flink/core/io/IOReadableWritable.java | 7 +-
.../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++--
.../checkpoint/savepoint/SavepointStore.java | 70 ++++++++++----------
.../io/network/api/CheckpointBarrier.java | 16 +++--
.../api/serialization/EventSerializer.java | 17 ++---
.../runtime/util/DataInputDeserializer.java | 6 +-
.../runtime/checkpoint/CheckpointTypeTest.java | 42 ++++++++++++
.../io/network/api/CheckpointBarrierTest.java | 2 +-
8 files changed, 145 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index a192a21..a38952e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView;
/**
* This interface must be implemented by every class whose objects have to be serialized to their binary representation
* and vice-versa. In particular, records have to implement this interface in order to specify how their data can be
- * transfered
- * to a binary representation.
- * When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor!
+ * transferred to a binary representation.
+ *
+ * <p>When implementing this Interface make sure that the implementing class has a default
+ * (zero-argument) constructor!
*/
@Public
public interface IOReadableWritable {
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 3c32d77..fc945c6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Utility class to convert objects into strings in vice-versa.
*/
@@ -302,19 +307,46 @@ public final class StringUtils {
}
return new String(data);
}
-
+
+ /**
+ * Writes a String to the given output.
+ * The written string can be read with {@link #readNullableString(DataInputView)}.
+ *
+ * @param str The string to write
+ * @param out The output to write to
+ *
+ * @throws IOException Thrown, if the writing or the serialization fails.
+ */
+ public static void writeString(@Nonnull String str, DataOutputView out) throws IOException {
+ checkNotNull(str);
+ StringValue.writeString(str, out);
+ }
+
+ /**
+ * Reads a non-null String from the given input.
+ *
+ * @param in The input to read from
+ * @return The deserialized String
+ *
+ * @throws IOException Thrown, if the reading or the deserialization fails.
+ */
+ public static String readString(DataInputView in) throws IOException {
+ return StringValue.readString(in);
+ }
+
/**
* Writes a String to the given output. The string may be null.
* The written string can be read with {@link #readNullableString(DataInputView)}-
*
* @param str The string to write, or null.
* @param out The output to write to.
- * @throws IOException Throws if the writing or the serialization fails.
+ *
+ * @throws IOException Thrown, if the writing or the serialization fails.
*/
- public static void writeNullableString(String str, DataOutputView out) throws IOException {
+ public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException {
if (str != null) {
out.writeBoolean(true);
- StringValue.writeString(str, out);
+ writeString(str, out);
} else {
out.writeBoolean(false);
}
@@ -326,11 +358,12 @@ public final class StringUtils {
*
* @param in The input to read from.
* @return The deserialized string, or null.
- * @throws IOException Throws if the reading or the deserialization fails.
+ *
+ * @throws IOException Thrown, if the reading or the deserialization fails.
*/
- public static String readNullableString(DataInputView in) throws IOException {
+ public static @Nullable String readNullableString(DataInputView in) throws IOException {
if (in.readBoolean()) {
- return StringValue.readString(in);
+ return readString(in);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 0caf5b2..95370a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,24 +18,27 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Utilities for storing and loading savepoint meta data files.
*
@@ -65,7 +68,10 @@ public class SavepointStore {
* @throws IOException FileSystem operation failures are forwarded
*/
public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
- String prefix;
+ final Path basePath = new Path(baseDirectory);
+ final FileSystem fs = basePath.getFileSystem();
+
+ final String prefix;
if (jobId == null) {
prefix = "savepoint-";
} else {
@@ -73,33 +79,21 @@ public class SavepointStore {
}
Exception latestException = null;
- Path savepointDirectory = null;
-
- FileSystem fs = null;
// Try to create a FS output stream
for (int attempt = 0; attempt < 10; attempt++) {
- Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
-
- if (fs == null) {
- fs = FileSystem.get(path.toUri());
- }
+ Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
try {
if (fs.mkdirs(path)) {
- savepointDirectory = path;
- break;
+ return path.toString();
}
} catch (Exception e) {
latestException = e;
}
}
- if (savepointDirectory == null) {
- throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
- } else {
- return savepointDirectory.getPath();
- }
+ throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
}
/**
@@ -121,20 +115,22 @@ public class SavepointStore {
* @param directory Target directory to store savepoint in
* @param savepoint Savepoint to be stored
* @return Path of stored savepoint
- * @throws Exception Failures during store are forwarded
+ * @throws IOException Failures during store are forwarded
*/
public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
checkNotNull(directory, "Target directory");
checkNotNull(savepoint, "Savepoint");
- Path basePath = new Path(directory);
- FileSystem fs = FileSystem.get(basePath.toUri());
+ final Path basePath = new Path(directory);
+ final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
- Path path = new Path(basePath, META_DATA_FILE);
- FSDataOutputStream fdos = fs.create(path, false);
+ final FileSystem fs = FileSystem.get(basePath.toUri());
boolean success = false;
- try (DataOutputStream dos = new DataOutputStream(fdos)) {
+ try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
+ DataOutputStream dos = new DataOutputStream(fdos))
+ {
+
// Write header
dos.writeInt(MAGIC_NUMBER);
dos.writeInt(savepoint.getVersion());
@@ -143,14 +139,18 @@ public class SavepointStore {
SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
serializer.serialize(savepoint, dos);
success = true;
- } finally {
- if (!success && fs.exists(path)) {
- if (!fs.delete(path, true)) {
- LOG.warn("Failed to delete file {} after failed write.", path);
+ }
+ finally {
+ if (!success && fs.exists(metadataFilePath)) {
+ if (!fs.delete(metadataFilePath, true)) {
+ LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
}
}
}
+ // we return the savepoint directory path here!
+ // The directory path also works to resume from and is more elegant than the direct
+ // metadata file pointer
return basePath.toString();
}
@@ -159,7 +159,7 @@ public class SavepointStore {
*
* @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
* @return The loaded savepoint
- * @throws Exception Failures during load are forwared
+ * @throws IOException Failures during load are forwarded
*/
public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
@@ -207,7 +207,7 @@ public class SavepointStore {
* Removes the savepoint meta data w/o loading and disposing it.
*
* @param path Path of savepoint to remove
- * @throws Exception Failures during disposal are forwarded
+ * @throws IOException Failures during disposal are forwarded
*/
public static void removeSavepointFile(String path) throws IOException {
Preconditions.checkNotNull(path, "Path");
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 0752897..a42c25d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api;
import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
import java.io.IOException;
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
+import org.apache.flink.util.StringUtils;
/**
* Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent;
*
* <p>Once an operator has received a checkpoint barrier from all its input channels, it
* knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
- * behavior and broadcast the barrier to downstream operators.</p>
+ * behavior and broadcast the barrier to downstream operators.
*
* <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
- * is complete (exactly once)</p>
+ * is complete (exactly once).
*
- * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.
*/
public class CheckpointBarrier extends RuntimeEvent {
@@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent {
return;
} else if (checkpointType == CheckpointType.SAVEPOINT) {
String targetLocation = checkpointOptions.getTargetLocation();
- assert(targetLocation != null);
- out.writeUTF(targetLocation);
+ checkState(targetLocation != null);
+ StringUtils.writeString(targetLocation, out);
} else {
throw new IOException("Unknown CheckpointType " + checkpointType);
}
@@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent {
timestamp = in.readLong();
int typeOrdinal = in.readInt();
- checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+ checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
checkpointOptions = CheckpointOptions.forFullCheckpoint();
} else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = in.readUTF();
+ String targetLocation = StringUtils.readString(in);
checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
} else {
throw new IOException("Illegal CheckpointType " + checkpointType);
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 223cbfe..3adf864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions;
*/
public class EventSerializer {
+ private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
+
private static final int END_OF_PARTITION_EVENT = 0;
private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -77,16 +79,16 @@ public class EventSerializer {
} else if (checkpointType == CheckpointType.SAVEPOINT) {
String targetLocation = checkpointOptions.getTargetLocation();
assert(targetLocation != null);
- byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+ byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);
- buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+ buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
buf.putLong(12, barrier.getTimestamp());
buf.putInt(20, checkpointType.ordinal());
- buf.putInt(24, bytes.length);
- for (int i = 0; i < bytes.length; i++) {
- buf.put(28 + i, bytes[i]);
+ buf.putInt(24, locationBytes.length);
+ for (int i = 0; i < locationBytes.length; i++) {
+ buf.put(28 + i, locationBytes[i]);
}
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
@@ -204,8 +206,7 @@ public class EventSerializer {
CheckpointOptions checkpointOptions;
int checkpointTypeOrdinal = buffer.getInt();
- Preconditions.checkElementIndex(type, CheckpointType.values().length,
- "Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+ Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
@@ -214,7 +215,7 @@ public class EventSerializer {
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
- String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+ String targetLocation = new String(bytes, STRING_CODING_CHARSET);
checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 0f99496..4e8871a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
// ------------------------------------------------------------------------
public DataInputDeserializer() {}
-
+
+ public DataInputDeserializer(byte[] buffer) {
+ setBuffer(buffer, 0, buffer.length);
+ }
+
public DataInputDeserializer(byte[] buffer, int start, int len) {
setBuffer(buffer, start, len);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
new file mode 100644
index 0000000..dfbde5e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTypeTest {
+
+ /**
+ * This test validates that the order of enumeration constants is not changed, because the
+ * ordinal of that enum is used in serialization.
+ *
+ * <p>It is still possible to edit both the ordinal and this test, but the test adds
+ * a level of safety, and should make developers stumble over this when attempting
+ * to adjust the enumeration.
+ */
+ @Test
+ public void testOrdinalsAreConstant() {
+ assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+ assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index dd5b0b6..ad9fc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -49,7 +49,7 @@ public class CheckpointBarrierTest {
DataOutputSerializer out = new DataOutputSerializer(1024);
barrier.write(out);
- DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
CheckpointBarrier deserialized = new CheckpointBarrier();
deserialized.read(in);