You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:30 UTC

[2/7] flink git commit: [FLINK-5763] [checkpoints] Followup on adding CheckpointOptions

[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions

  - Add a test that validates the checkpoint type ordinals are not changed
  - Change target location writing from 'writeUtf' to 'StringUtils.write'.
  - Pull out the coding charset as a constant in 'EventSerializer'
  - Simplify the directory creation in 'SavepointStore'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b

Branch: refs/heads/master
Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5
Parents: 6e7a917
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 14:11:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../flink/core/io/IOReadableWritable.java       |  7 +-
 .../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++--
 .../checkpoint/savepoint/SavepointStore.java    | 70 ++++++++++----------
 .../io/network/api/CheckpointBarrier.java       | 16 +++--
 .../api/serialization/EventSerializer.java      | 17 ++---
 .../runtime/util/DataInputDeserializer.java     |  6 +-
 .../runtime/checkpoint/CheckpointTypeTest.java  | 42 ++++++++++++
 .../io/network/api/CheckpointBarrierTest.java   |  2 +-
 8 files changed, 145 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index a192a21..a38952e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * This interface must be implemented by every class whose objects have to be serialized to their binary representation
  * and vice-versa. In particular, records have to implement this interface in order to specify how their data can be
- * transfered
- * to a binary representation.
- * When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor!
+ * transferred to a binary representation.
+ * 
+ * <p>When implementing this Interface make sure that the implementing class has a default
+ * (zero-argument) constructor!
  */
 @Public
 public interface IOReadableWritable {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 3c32d77..fc945c6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utility class to convert objects into strings in vice-versa.
  */
@@ -302,19 +307,46 @@ public final class StringUtils {
 		}
 		return new String(data);
 	}
-	
+
+	/**
+	 * Writes a String to the given output.
+	 * The written string can be read with {@link #readNullableString(DataInputView)}.
+	 *
+	 * @param str The string to write
+	 * @param out The output to write to
+	 *   
+	 * @throws IOException Thrown, if the writing or the serialization fails.
+	 */
+	public static void writeString(@Nonnull String str, DataOutputView out) throws IOException {
+		checkNotNull(str);
+		StringValue.writeString(str, out);
+	}
+
+	/**
+	 * Reads a non-null String from the given input.
+	 *
+	 * @param in The input to read from
+	 * @return The deserialized String
+	 * 
+	 * @throws IOException Thrown, if the reading or the deserialization fails.
+	 */
+	public static String readString(DataInputView in) throws IOException {
+		return StringValue.readString(in);
+	}
+
 	/**
 	 * Writes a String to the given output. The string may be null.
 	 * The written string can be read with {@link #readNullableString(DataInputView)}-
 	 * 
 	 * @param str The string to write, or null.
 	 * @param out The output to write to.
-	 * @throws IOException Throws if the writing or the serialization fails.
+	 * 
+	 * @throws IOException Thrown, if the writing or the serialization fails.
 	 */
-	public static void writeNullableString(String str, DataOutputView out) throws IOException {
+	public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException {
 		if (str != null) {
 			out.writeBoolean(true);
-			StringValue.writeString(str, out);
+			writeString(str, out);
 		} else {
 			out.writeBoolean(false);
 		}
@@ -326,11 +358,12 @@ public final class StringUtils {
 	 * 
 	 * @param in The input to read from.
 	 * @return The deserialized string, or null.
-	 * @throws IOException Throws if the reading or the deserialization fails.
+	 * 
+	 * @throws IOException Thrown, if the reading or the deserialization fails.
 	 */
-	public static String readNullableString(DataInputView in) throws IOException {
+	public static @Nullable String readNullableString(DataInputView in) throws IOException {
 		if (in.readBoolean()) {
-			return StringValue.readString(in);
+			return readString(in);
 		} else {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 0caf5b2..95370a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,24 +18,27 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utilities for storing and loading savepoint meta data files.
  *
@@ -65,7 +68,10 @@ public class SavepointStore {
 	 * @throws IOException FileSystem operation failures are forwarded
 	 */
 	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
-		String prefix;
+		final Path basePath = new Path(baseDirectory);
+		final FileSystem fs = basePath.getFileSystem();
+
+		final String prefix;
 		if (jobId == null) {
 			prefix = "savepoint-";
 		} else {
@@ -73,33 +79,21 @@ public class SavepointStore {
 		}
 
 		Exception latestException = null;
-		Path savepointDirectory = null;
-
-		FileSystem fs = null;
 
 		// Try to create a FS output stream
 		for (int attempt = 0; attempt < 10; attempt++) {
-			Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
-
-			if (fs == null) {
-				fs = FileSystem.get(path.toUri());
-			}
+			Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
 
 			try {
 				if (fs.mkdirs(path)) {
-					savepointDirectory = path;
-					break;
+					return path.toString();
 				}
 			} catch (Exception e) {
 				latestException = e;
 			}
 		}
 
-		if (savepointDirectory == null) {
-			throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
-		} else {
-			return savepointDirectory.getPath();
-		}
+		throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
 	}
 
 	/**
@@ -121,20 +115,22 @@ public class SavepointStore {
 	 * @param directory Target directory to store savepoint in
 	 * @param savepoint Savepoint to be stored
 	 * @return Path of stored savepoint
-	 * @throws Exception Failures during store are forwarded
+	 * @throws IOException Failures during store are forwarded
 	 */
 	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
 		checkNotNull(directory, "Target directory");
 		checkNotNull(savepoint, "Savepoint");
 
-		Path basePath = new Path(directory);
-		FileSystem fs = FileSystem.get(basePath.toUri());
+		final Path basePath = new Path(directory);
+		final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
 
-		Path path = new Path(basePath, META_DATA_FILE);
-		FSDataOutputStream fdos = fs.create(path, false);
+		final FileSystem fs = FileSystem.get(basePath.toUri());
 
 		boolean success = false;
-		try (DataOutputStream dos = new DataOutputStream(fdos)) {
+		try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); 
+				DataOutputStream dos = new DataOutputStream(fdos))
+		{
+
 			// Write header
 			dos.writeInt(MAGIC_NUMBER);
 			dos.writeInt(savepoint.getVersion());
@@ -143,14 +139,18 @@ public class SavepointStore {
 			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
 			serializer.serialize(savepoint, dos);
 			success = true;
-		} finally {
-			if (!success && fs.exists(path)) {
-				if (!fs.delete(path, true)) {
-					LOG.warn("Failed to delete file {} after failed write.", path);
+		}
+		finally {
+			if (!success && fs.exists(metadataFilePath)) {
+				if (!fs.delete(metadataFilePath, true)) {
+					LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
 				}
 			}
 		}
 
+		// we return the savepoint directory path here!
+		// The directory path also works to resume from and is more elegant than the direct
+		// metadata file pointer
 		return basePath.toString();
 	}
 
@@ -159,7 +159,7 @@ public class SavepointStore {
 	 *
 	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
 	 * @return The loaded savepoint
-	 * @throws Exception Failures during load are forwared
+	 * @throws IOException Failures during load are forwarded
 	 */
 	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
 		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
@@ -207,7 +207,7 @@ public class SavepointStore {
 	 * Removes the savepoint meta data w/o loading and disposing it.
 	 *
 	 * @param path Path of savepoint to remove
-	 * @throws Exception Failures during disposal are forwarded
+	 * @throws IOException Failures during disposal are forwarded
 	 */
 	public static void removeSavepointFile(String path) throws IOException {
 		Preconditions.checkNotNull(path, "Path");

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 0752897..a42c25d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
+import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent;
  * 
  * <p>Once an operator has received a checkpoint barrier from all its input channels, it
  * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
- * behavior and broadcast the barrier to downstream operators.</p>
+ * behavior and broadcast the barrier to downstream operators.
  * 
  * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
- * is complete (exactly once)</p>
+ * is complete (exactly once).
  * 
- * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
@@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent {
 			return;
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
 			String targetLocation = checkpointOptions.getTargetLocation();
-			assert(targetLocation != null);
-			out.writeUTF(targetLocation);
+			checkState(targetLocation != null);
+			StringUtils.writeString(targetLocation, out);
 		} else {
 			throw new IOException("Unknown CheckpointType " + checkpointType);
 		}
@@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent {
 		timestamp = in.readLong();
 
 		int typeOrdinal = in.readInt();
-		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
 		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
 
 		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
 			checkpointOptions = CheckpointOptions.forFullCheckpoint();
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = in.readUTF();
+			String targetLocation = StringUtils.readString(in);
 			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
 		} else {
 			throw new IOException("Illegal CheckpointType " + checkpointType);

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 223cbfe..3adf864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class EventSerializer {
 
+	private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
+
 	private static final int END_OF_PARTITION_EVENT = 0;
 
 	private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -77,16 +79,16 @@ public class EventSerializer {
 			} else if (checkpointType == CheckpointType.SAVEPOINT) {
 				String targetLocation = checkpointOptions.getTargetLocation();
 				assert(targetLocation != null);
-				byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+				byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);
 
-				buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+				buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
 				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
 				buf.putLong(4, barrier.getId());
 				buf.putLong(12, barrier.getTimestamp());
 				buf.putInt(20, checkpointType.ordinal());
-				buf.putInt(24, bytes.length);
-				for (int i = 0; i < bytes.length; i++) {
-					buf.put(28 + i, bytes[i]);
+				buf.putInt(24, locationBytes.length);
+				for (int i = 0; i < locationBytes.length; i++) {
+					buf.put(28 + i, locationBytes[i]);
 				}
 			} else {
 				throw new IOException("Unknown checkpoint type: " + checkpointType);
@@ -204,8 +206,7 @@ public class EventSerializer {
 				CheckpointOptions checkpointOptions;
 
 				int checkpointTypeOrdinal = buffer.getInt();
-				Preconditions.checkElementIndex(type, CheckpointType.values().length,
-					"Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+				Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
 				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
 
 				if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
@@ -214,7 +215,7 @@ public class EventSerializer {
 					int len = buffer.getInt();
 					byte[] bytes = new byte[len];
 					buffer.get(bytes);
-					String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+					String targetLocation = new String(bytes, STRING_CODING_CHARSET);
 
 					checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 0f99496..4e8871a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
 	// ------------------------------------------------------------------------
 	
 	public DataInputDeserializer() {}
-	
+
+	public DataInputDeserializer(byte[] buffer) {
+		setBuffer(buffer, 0, buffer.length);
+	}
+
 	public DataInputDeserializer(byte[] buffer, int start, int len) {
 		setBuffer(buffer, start, len);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
new file mode 100644
index 0000000..dfbde5e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTypeTest {
+
+	/**
+	 * This test validates that the order of enumeration constants is not changed, because the
+	 * ordinal of that enum is used in serialization.
+	 *
+	 * <p>It is still possible to edit both the ordinal and this test, but the test adds
+	 * a level of safety, and should make developers stumble over this when attempting
+	 * to adjust the enumeration.
+	 */
+	@Test
+	public void testOrdinalsAreConstant() {
+		assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+		assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index dd5b0b6..ad9fc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -49,7 +49,7 @@ public class CheckpointBarrierTest {
 		DataOutputSerializer out = new DataOutputSerializer(1024);
 		barrier.write(out);
 
-		DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
 		CheckpointBarrier deserialized = new CheckpointBarrier();
 		deserialized.read(in);