You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/11 00:23:20 UTC
reef git commit: [REEF-1659] Refactor checkpointing service for REEF
Java.
Repository: reef
Updated Branches:
refs/heads/master b7a98d795 -> 4392ab83b
[REEF-1659] Refactor checkpointing service for REEF Java.
This patch refactors the 'reef-checkpoint' package for readability.
JIRA:
[REEF-1659](https://issues.apache.org/jira/browse/REEF-1659)
Pull Request:
This closes #1178
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4392ab83
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4392ab83
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4392ab83
Branch: refs/heads/master
Commit: 4392ab83badf077133bdaeabaaa7168f6588852b
Parents: b7a98d7
Author: Sergey Dudoladov <ss...@apache.org>
Authored: Fri Nov 4 21:55:09 2016 +0100
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Nov 10 16:22:19 2016 -0800
----------------------------------------------------------------------
.../apache/reef/io/checkpoint/CheckpointID.java | 7 ++-
.../reef/io/checkpoint/CheckpointService.java | 34 ++++++++------
.../reef/io/checkpoint/RandomNameCNS.java | 24 ++++++++--
.../reef/io/checkpoint/SimpleNamingService.java | 5 +-
.../fs/FSCheckPointServiceConfiguration.java | 6 ++-
.../reef/io/checkpoint/fs/FSCheckpointID.java | 9 ++--
.../io/checkpoint/fs/FSCheckpointService.java | 49 ++++++++++++++------
.../reef/io/checkpoint/fs/package-info.java | 2 +-
8 files changed, 93 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
index b286878..927f714 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
@@ -21,10 +21,9 @@ package org.apache.reef.io.checkpoint;
import org.apache.hadoop.io.Writable;
/**
- * This class represent the identified (memento) for a checkpoint. It is allowed
- * to contain small amount of metadata about a checkpoint and must provide sufficient
- * information to the corresponding CheckpointService to locate and retrieve the
- * data contained in the checkpoint.
+ * This class represent an identifier of a checkpoint.
+ * The identifier must provide sufficient information to the corresponding CheckpointService to locate and retrieve the
+ * data contained in the checkpoint. The identifier can also contain small amount of metadata about a checkpoint.
*/
public interface CheckpointID extends Writable {
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
index d776665..99f5bf4 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
@@ -25,11 +25,14 @@ import java.nio.channels.WritableByteChannel;
/**
* The CheckpointService provides a simple API to store and retrieve the state of a task.
* <p>
- * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
- * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint,
- * and by preventing a checkpoint to be re-opened for writes.
+ * Checkpoints are assumed to be atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
+ *
+ * To ensure this, any implementation of this interface must.
+ * 1) Return a CheckpointID to a client only
+ * upon the successful {@link #commit(CheckpointWriteChannel) commit} of the checkpoint.
+ * 2) Prevent any checkpoint from being re-opened for writes.
* <p>
- * Non-functional properties such as durability, availability, compression, garbage collection,
+ * Non-functional properties such as durability, availability, compression, garbage collection, and
* quotas are left to the implementation.
* <p>
* This API is envisioned as the basic building block for a checkpoint service, on top of which richer
@@ -39,10 +42,10 @@ import java.nio.channels.WritableByteChannel;
public interface CheckpointService {
/**
- * This method creates a checkpoint and provide a channel to write to it.
+ * Creates a checkpoint and provides a channel to write to it.
* The name/location of the checkpoint are unknown to the user as of this time, in fact,
- * the CheckpointID is not released to the user until commit is called. This makes enforcing
- * atomicity of writes easy.
+ * the CheckpointID is not released to the user until {@link #commit(CheckpointWriteChannel) commit} is called.
+ * This makes enforcing atomicity of writes easy.
*
* @return a CheckpointWriteChannel that can be used to write to the checkpoint
* @throws IOException
@@ -51,8 +54,11 @@ public interface CheckpointService {
CheckpointWriteChannel create() throws IOException, InterruptedException;
/**
- * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later
- * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint.
+ * Closes an existing checkpoint for writes and returns the CheckpointID that can be later
+ * used to get the read-only access to this checkpoint.
+ *
+ * Implementation is supposed to return the CheckpointID to the caller only on the
+ * successful completion of checkpoint to guarantee atomicity of the checkpoint.
*
* @param channel the CheckpointWriteChannel to commit
* @return a CheckpointID
@@ -62,9 +68,9 @@ public interface CheckpointService {
CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException;
/**
- * Dual to commit, it aborts the current checkpoint. Garbage collection choices are
- * left to the implementation. The CheckpointID is not generated nor released to the
- * user so the checkpoint is not accessible.
+ * Aborts the current checkpoint. Garbage collection choices are
+ * left to the implementation. The CheckpointID is neither generated nor released to the
+ * client so the checkpoint is not accessible.
*
* @param channel the CheckpointWriteChannel to abort
* @throws IOException
@@ -73,7 +79,7 @@ public interface CheckpointService {
void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException;
/**
- * Given a CheckpointID returns a reading channel.
+ * Returns a reading channel to a checkpoint identified by the CheckpointID.
*
* @param checkpointId CheckpointID for the checkpoint to be opened
* @return a CheckpointReadChannel
@@ -83,7 +89,7 @@ public interface CheckpointService {
CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException;
/**
- * It discards an existing checkpoint identified by its CheckpointID.
+ * Discards an existing checkpoint identified by its CheckpointID.
*
* @param checkpointId CheckpointID for the checkpoint to be deleted
* @return a boolean confirming success of the deletion
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
index 7de31cd..50e44a7 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
@@ -26,20 +26,31 @@ import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
/**
- * Simple naming service that generates a random checkpoint name.
+ * A naming service that generates a random checkpoint name by appending a random alphanumeric string (suffix)
+ * of a given length to a user-supplied prefix string.
*/
public class RandomNameCNS implements CheckpointNamingService {
private final String prefix;
+ private final int lengthOfRandomSuffix;
- @Inject
+ @Deprecated
public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) {
this.prefix = prefix;
+ this.lengthOfRandomSuffix
+ = Integer.parseInt(LengthOfRandomSuffix.class.getAnnotation(NamedParameter.class).default_value());
+ }
+
+ @Inject
+ private RandomNameCNS(@Parameter(PREFIX.class) final String prefix,
+ @Parameter(LengthOfRandomSuffix.class) final int lengthOfRandomSuffix) {
+ this.prefix = prefix;
+ this.lengthOfRandomSuffix = lengthOfRandomSuffix;
}
@Override
public String getNewName() {
- return this.prefix + RandomStringUtils.randomAlphanumeric(8);
+ return this.prefix + RandomStringUtils.randomAlphanumeric(lengthOfRandomSuffix);
}
/**
@@ -49,4 +60,11 @@ public class RandomNameCNS implements CheckpointNamingService {
public static class PREFIX implements Name<String> {
}
+ /**
+ * Number of alphanumeric characters in the random part of a checkpoint name.
+ */
+ @NamedParameter(doc = "Number of alphanumeric chars in the random part of a checkpoint name.", default_value = "8")
+ public static class LengthOfRandomSuffix implements Name<Integer> {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
index 7b43380..823ff1e 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
@@ -26,6 +26,7 @@ import javax.inject.Inject;
/**
* A naming service that simply returns the name it has been initialized with.
+ * Note that the name is always the same.
*/
public class SimpleNamingService implements CheckpointNamingService {
@@ -37,7 +38,7 @@ public class SimpleNamingService implements CheckpointNamingService {
}
/**
- * Generate a new checkpoint Name.
+ * Generate a new checkpoint name.
*
* @return the checkpoint name
*/
@@ -49,7 +50,7 @@ public class SimpleNamingService implements CheckpointNamingService {
/**
* Prefix for checkpoints.
*/
- @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef")
+ @NamedParameter(doc = "Checkpoint name.", short_name = "checkpoint_name", default_value = "reef")
public static final class CheckpointName implements Name<String> {
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
index b48a8b9..c8464a6 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
@@ -39,7 +39,7 @@ import javax.inject.Inject;
import java.io.IOException;
/**
- * ConfigurationModule for the FSCheckPointService.
+ * ConfigurationModule for the FSCheckpointService.
* This can be used to create Evaluator-side configurations of the checkpointing service.
*/
@DriverSide
@@ -65,11 +65,15 @@ public class FSCheckPointServiceConfiguration extends ConfigurationModuleBuilder
* Prefix for checkpoint files (optional).
*/
public static final OptionalParameter<String> PREFIX = new OptionalParameter<>();
+
+
public static final ConfigurationModule CONF = new FSCheckPointServiceConfiguration()
+
.bindImplementation(CheckpointService.class, FSCheckpointService.class) // Use the HDFS based checkpoints
.bindImplementation(CheckpointNamingService.class, RandomNameCNS.class) // Use Random Names for the checkpoints
.bindImplementation(CheckpointID.class, FSCheckpointID.class)
.bindConstructor(FileSystem.class, FileSystemConstructor.class)
+
.bindNamedParameter(FileSystemConstructor.IsLocal.class, IS_LOCAL)
.bindNamedParameter(FSCheckpointService.PATH.class, PATH)
.bindNamedParameter(FSCheckpointService.ReplicationFactor.class, REPLICATION_FACTOR)
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
index f43cc67..1b97feb 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
@@ -34,8 +34,9 @@ public class FSCheckpointID implements CheckpointID {
private Path path;
- public FSCheckpointID() {
- }
+ // CheckpointID extends Hadoop Writable interface that enables serialization
+ // Java serialization requires a (potentially empty) public default constructor
+ public FSCheckpointID(){}
public FSCheckpointID(final Path path) {
this.path = path;
@@ -62,8 +63,8 @@ public class FSCheckpointID implements CheckpointID {
@Override
public boolean equals(final Object other) {
- return other instanceof FSCheckpointID
- && path.equals(((FSCheckpointID) other).path);
+ return (other instanceof FSCheckpointID)
+ && path.equals(((FSCheckpointID) other).path);
}
@Override
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
index b7f6ec3..fcc65a2 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
@@ -39,10 +39,12 @@ import java.nio.channels.WritableByteChannel;
/**
* A FileSystem based CheckpointService.
+ *
+ * Note that this implementation creates a temporary file first and moves it to final destination at commit time.
*/
public class FSCheckpointService implements CheckpointService {
- private final Path base;
+ private final Path basePath;
private final FileSystem fs;
private final CheckpointNamingService namingPolicy;
private final short replication;
@@ -53,7 +55,7 @@ public class FSCheckpointService implements CheckpointService {
final CheckpointNamingService namingPolicy,
@Parameter(ReplicationFactor.class) final short replication) {
this.fs = fs;
- this.base = new Path(basePath);
+ this.basePath = new Path(basePath);
this.namingPolicy = namingPolicy;
this.replication = replication;
}
@@ -63,7 +65,7 @@ public class FSCheckpointService implements CheckpointService {
final CheckpointNamingService namingPolicy,
final short replication) {
this.fs = fs;
- this.base = base;
+ this.basePath = base;
this.namingPolicy = namingPolicy;
this.replication = replication;
}
@@ -76,77 +78,95 @@ public class FSCheckpointService implements CheckpointService {
throws IOException {
final String name = namingPolicy.getNewName();
-
final Path p = new Path(name);
if (p.isUriPathAbsolute()) {
- throw new IOException("Checkpoint cannot be an absolute path");
+ throw new IOException("Checkpoint name cannot be an absolute path.");
}
- return createInternal(new Path(base, p));
+
+ return createInternal(new Path(basePath, p));
}
+
CheckpointWriteChannel createInternal(final Path name) throws IOException {
- //create a temp file, fail if file exists
+ /* Create a temp file, fail if file exists.
+ The likely reason to do so (I am not the original author) is to check that the file is indeed writable.
+ Checking this directly via a file system call may lead to a time-of-check/time-of-use race condition.
+ See the pull request for REEF-1659 for discussion.
+ */
return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
}
@Override
public CheckpointReadChannel open(final CheckpointID id)
throws IOException, InterruptedException {
+
if (!(id instanceof FSCheckpointID)) {
throw new IllegalArgumentException(
- "Mismatched checkpoint type: " + id.getClass());
+ "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
}
+
return new FSCheckpointReadChannel(
fs.open(((FSCheckpointID) id).getPath()));
}
@Override
- public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
- InterruptedException {
+ public CheckpointID commit(final CheckpointWriteChannel ch)
+ throws IOException, InterruptedException {
+
if (ch.isOpen()) {
ch.close();
}
+
final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
final Path dst = hch.getDestination();
+
if (!fs.rename(tmpfile(dst), dst)) {
// attempt to clean up
abort(ch);
throw new IOException("Failed to promote checkpoint" +
tmpfile(dst) + " -> " + dst);
}
+
return new FSCheckpointID(hch.getDestination());
}
@Override
public void abort(final CheckpointWriteChannel ch) throws IOException {
+
if (ch.isOpen()) {
ch.close();
}
+
final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
final Path tmp = tmpfile(hch.getDestination());
+
try {
if (!fs.delete(tmp, false)) {
- throw new IOException("Failed to delete checkpoint during abort");
+ throw new IOException("Failed to delete a temporary checkpoint file during abort. Path: " + tmp);
}
} catch (final FileNotFoundException ignored) {
// IGNORE
}
+
}
@Override
- public boolean delete(final CheckpointID id) throws IOException,
- InterruptedException {
+ public boolean delete(final CheckpointID id)
+ throws IOException, InterruptedException {
+
if (!(id instanceof FSCheckpointID)) {
throw new IllegalArgumentException(
- "Mismatched checkpoint type: " + id.getClass());
+ "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
}
+
final Path tmp = ((FSCheckpointID) id).getPath();
try {
return fs.delete(tmp, false);
} catch (final FileNotFoundException ignored) {
// IGNORE
}
+
return true;
}
@@ -160,6 +180,7 @@ public class FSCheckpointService implements CheckpointService {
private static class FSCheckpointWriteChannel
implements CheckpointWriteChannel {
+
private final Path finalDst;
private final WritableByteChannel out;
private boolean isOpen = true;
http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
index c63e054..8eb797c 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
@@ -17,6 +17,6 @@
* under the License.
*/
/**
- * FileSystem based checkpoints.
+ * Provides an example implementation of a CheckpointService based on a file system.
*/
package org.apache.reef.io.checkpoint.fs;