You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/11 00:23:20 UTC

reef git commit: [REEF-1659] Refactor checkpointing service for REEF Java.

Repository: reef
Updated Branches:
  refs/heads/master b7a98d795 -> 4392ab83b


[REEF-1659] Refactor checkpointing service for REEF Java.

This patch refactors the 'reef-checkpoint' package for readability.

JIRA:
  [REEF-1659](https://issues.apache.org/jira/browse/REEF-1659)

Pull Request:
  This closes #1178


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4392ab83
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4392ab83
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4392ab83

Branch: refs/heads/master
Commit: 4392ab83badf077133bdaeabaaa7168f6588852b
Parents: b7a98d7
Author: Sergey Dudoladov <ss...@apache.org>
Authored: Fri Nov 4 21:55:09 2016 +0100
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Nov 10 16:22:19 2016 -0800

----------------------------------------------------------------------
 .../apache/reef/io/checkpoint/CheckpointID.java |  7 ++-
 .../reef/io/checkpoint/CheckpointService.java   | 34 ++++++++------
 .../reef/io/checkpoint/RandomNameCNS.java       | 24 ++++++++--
 .../reef/io/checkpoint/SimpleNamingService.java |  5 +-
 .../fs/FSCheckPointServiceConfiguration.java    |  6 ++-
 .../reef/io/checkpoint/fs/FSCheckpointID.java   |  9 ++--
 .../io/checkpoint/fs/FSCheckpointService.java   | 49 ++++++++++++++------
 .../reef/io/checkpoint/fs/package-info.java     |  2 +-
 8 files changed, 93 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
index b286878..927f714 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
@@ -21,10 +21,9 @@ package org.apache.reef.io.checkpoint;
 import org.apache.hadoop.io.Writable;
 
 /**
- * This class represent the identified (memento) for a checkpoint. It is allowed
- * to contain small amount of metadata about a checkpoint and must provide sufficient
- * information to the corresponding CheckpointService to locate and retrieve the
- * data contained in the checkpoint.
+ * This class represent an identifier of a checkpoint.
+ * The identifier must provide sufficient information to the corresponding CheckpointService to locate and retrieve the
+ * data contained in the checkpoint. The identifier can also contain small amount of metadata about a checkpoint.
  */
 public interface CheckpointID extends Writable {
 

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
index d776665..99f5bf4 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
@@ -25,11 +25,14 @@ import java.nio.channels.WritableByteChannel;
 /**
  * The CheckpointService provides a simple API to store and retrieve the state of a task.
  * <p>
- * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
- * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint,
- * and by preventing a checkpoint to be re-opened for writes.
+ * Checkpoints are assumed to be atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
+ *
+ * To ensure this, any implementation of this interface must.
+ * 1) Return a CheckpointID to a client only
+ *    upon the successful {@link #commit(CheckpointWriteChannel) commit} of the checkpoint.
+ * 2) Prevent any checkpoint from being re-opened for writes.
  * <p>
- * Non-functional properties such as durability, availability, compression, garbage collection,
+ * Non-functional properties such as durability, availability, compression, garbage collection, and
  * quotas are left to the implementation.
  * <p>
  * This API is envisioned as the basic building block for a checkpoint service, on top of which richer
@@ -39,10 +42,10 @@ import java.nio.channels.WritableByteChannel;
 public interface CheckpointService {
 
   /**
-   * This method creates a checkpoint and provide a channel to write to it.
+   * Creates a checkpoint and provides a channel to write to it.
    * The name/location of the checkpoint are unknown to the user as of this time, in fact,
-   * the CheckpointID is not released to the user until commit is called. This makes enforcing
-   * atomicity of writes easy.
+   * the CheckpointID is not released to the user until {@link #commit(CheckpointWriteChannel) commit} is called.
+   * This makes enforcing atomicity of writes easy.
    *
    * @return a CheckpointWriteChannel that can be used to write to the checkpoint
    * @throws IOException
@@ -51,8 +54,11 @@ public interface CheckpointService {
   CheckpointWriteChannel create() throws IOException, InterruptedException;
 
   /**
-   * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later
-   * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint.
+   * Closes an existing checkpoint for writes and returns the CheckpointID that can be later
+   * used to get the read-only access to this checkpoint.
+   *
+   * Implementation  is supposed to return the CheckpointID to the caller only on the
+   * successful completion of checkpoint to guarantee atomicity of the checkpoint.
    *
    * @param channel the CheckpointWriteChannel to commit
    * @return a CheckpointID
@@ -62,9 +68,9 @@ public interface CheckpointService {
   CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException;
 
   /**
-   * Dual to commit, it aborts the current checkpoint. Garbage collection choices are
-   * left to the implementation. The CheckpointID is not generated nor released to the
-   * user so the checkpoint is not accessible.
+   * Aborts the current checkpoint. Garbage collection choices are
+   * left to the implementation. The CheckpointID is neither generated nor released to the
+   * client so the checkpoint is not accessible.
    *
    * @param channel the CheckpointWriteChannel to abort
    * @throws IOException
@@ -73,7 +79,7 @@ public interface CheckpointService {
   void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException;
 
   /**
-   * Given a CheckpointID returns a reading channel.
+   * Returns a reading channel to a checkpoint identified by the CheckpointID.
    *
    * @param checkpointId CheckpointID for the checkpoint to be opened
    * @return a CheckpointReadChannel
@@ -83,7 +89,7 @@ public interface CheckpointService {
   CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException;
 
   /**
-   * It discards an existing checkpoint identified by its CheckpointID.
+   * Discards an existing checkpoint identified by its CheckpointID.
    *
    * @param checkpointId CheckpointID for the checkpoint to be deleted
    * @return a boolean confirming success of the deletion

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
index 7de31cd..50e44a7 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
@@ -26,20 +26,31 @@ import org.apache.reef.tang.annotations.Parameter;
 import javax.inject.Inject;
 
 /**
- * Simple naming service that generates a random checkpoint name.
+ * A naming service that generates a random checkpoint name by appending a random alphanumeric string (suffix)
+ * of a given length to a user-supplied prefix string.
  */
 public class RandomNameCNS implements CheckpointNamingService {
 
   private final String prefix;
+  private final int lengthOfRandomSuffix;
 
-  @Inject
+  @Deprecated
   public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) {
     this.prefix = prefix;
+    this.lengthOfRandomSuffix
+            = Integer.parseInt(LengthOfRandomSuffix.class.getAnnotation(NamedParameter.class).default_value());
+  }
+
+  @Inject
+  private RandomNameCNS(@Parameter(PREFIX.class) final String prefix,
+                        @Parameter(LengthOfRandomSuffix.class) final int lengthOfRandomSuffix) {
+    this.prefix = prefix;
+    this.lengthOfRandomSuffix = lengthOfRandomSuffix;
   }
 
   @Override
   public String getNewName() {
-    return this.prefix + RandomStringUtils.randomAlphanumeric(8);
+    return this.prefix + RandomStringUtils.randomAlphanumeric(lengthOfRandomSuffix);
   }
 
   /**
@@ -49,4 +60,11 @@ public class RandomNameCNS implements CheckpointNamingService {
   public static class PREFIX implements Name<String> {
   }
 
+  /**
+   * Number of alphanumeric characters in the random part of a checkpoint name.
+   */
+  @NamedParameter(doc = "Number of alphanumeric chars in the random part of a checkpoint name.", default_value = "8")
+  public static class LengthOfRandomSuffix implements Name<Integer> {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
index 7b43380..823ff1e 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
@@ -26,6 +26,7 @@ import javax.inject.Inject;
 
 /**
  * A naming service that simply returns the name it has been initialized with.
+ * Note that the name is always the same.
  */
 public class SimpleNamingService implements CheckpointNamingService {
 
@@ -37,7 +38,7 @@ public class SimpleNamingService implements CheckpointNamingService {
   }
 
   /**
-   * Generate a new checkpoint Name.
+   * Generate a new checkpoint name.
    *
    * @return the checkpoint name
    */
@@ -49,7 +50,7 @@ public class SimpleNamingService implements CheckpointNamingService {
   /**
    * Prefix for checkpoints.
    */
-  @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef")
+  @NamedParameter(doc = "Checkpoint name.", short_name = "checkpoint_name", default_value = "reef")
   public static final class CheckpointName implements Name<String> {
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
index b48a8b9..c8464a6 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
@@ -39,7 +39,7 @@ import javax.inject.Inject;
 import java.io.IOException;
 
 /**
- * ConfigurationModule for the FSCheckPointService.
+ * ConfigurationModule for the FSCheckpointService.
  * This can be used to create Evaluator-side configurations of the checkpointing service.
  */
 @DriverSide
@@ -65,11 +65,15 @@ public class FSCheckPointServiceConfiguration extends ConfigurationModuleBuilder
    * Prefix for checkpoint files (optional).
    */
   public static final OptionalParameter<String> PREFIX = new OptionalParameter<>();
+
+
   public static final ConfigurationModule CONF = new FSCheckPointServiceConfiguration()
+
       .bindImplementation(CheckpointService.class, FSCheckpointService.class) // Use the HDFS based checkpoints
       .bindImplementation(CheckpointNamingService.class, RandomNameCNS.class) // Use Random Names for the checkpoints
       .bindImplementation(CheckpointID.class, FSCheckpointID.class)
       .bindConstructor(FileSystem.class, FileSystemConstructor.class)
+
       .bindNamedParameter(FileSystemConstructor.IsLocal.class, IS_LOCAL)
       .bindNamedParameter(FSCheckpointService.PATH.class, PATH)
       .bindNamedParameter(FSCheckpointService.ReplicationFactor.class, REPLICATION_FACTOR)

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
index f43cc67..1b97feb 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
@@ -34,8 +34,9 @@ public class FSCheckpointID implements CheckpointID {
 
   private Path path;
 
-  public FSCheckpointID() {
-  }
+  // CheckpointID extends Hadoop Writable interface that enables serialization
+  // Java serialization requires a (potentially empty) public default constructor
+  public FSCheckpointID(){}
 
   public FSCheckpointID(final Path path) {
     this.path = path;
@@ -62,8 +63,8 @@ public class FSCheckpointID implements CheckpointID {
 
   @Override
   public boolean equals(final Object other) {
-    return other instanceof FSCheckpointID
-        && path.equals(((FSCheckpointID) other).path);
+    return (other instanceof FSCheckpointID)
+            && path.equals(((FSCheckpointID) other).path);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
index b7f6ec3..fcc65a2 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
@@ -39,10 +39,12 @@ import java.nio.channels.WritableByteChannel;
 
 /**
  * A FileSystem based CheckpointService.
+ *
+ * Note that this implementation creates a temporary file first and moves it to final destination at commit time.
  */
 public class FSCheckpointService implements CheckpointService {
 
-  private final Path base;
+  private final Path basePath;
   private final FileSystem fs;
   private final CheckpointNamingService namingPolicy;
   private final short replication;
@@ -53,7 +55,7 @@ public class FSCheckpointService implements CheckpointService {
                       final CheckpointNamingService namingPolicy,
                       @Parameter(ReplicationFactor.class) final short replication) {
     this.fs = fs;
-    this.base = new Path(basePath);
+    this.basePath = new Path(basePath);
     this.namingPolicy = namingPolicy;
     this.replication = replication;
   }
@@ -63,7 +65,7 @@ public class FSCheckpointService implements CheckpointService {
                              final CheckpointNamingService namingPolicy,
                              final short replication) {
     this.fs = fs;
-    this.base = base;
+    this.basePath = base;
     this.namingPolicy = namingPolicy;
     this.replication = replication;
   }
@@ -76,77 +78,95 @@ public class FSCheckpointService implements CheckpointService {
       throws IOException {
 
     final String name = namingPolicy.getNewName();
-
     final Path p = new Path(name);
     if (p.isUriPathAbsolute()) {
-      throw new IOException("Checkpoint cannot be an absolute path");
+      throw new IOException("Checkpoint name cannot be an absolute path.");
     }
-    return createInternal(new Path(base, p));
+
+    return createInternal(new Path(basePath, p));
   }
 
+
   CheckpointWriteChannel createInternal(final Path name) throws IOException {
 
-    //create a temp file, fail if file exists
+   /*  Create a temp file, fail if file exists.
+       The likely reason to do so (I am not the original author) is to check that the file is indeed writable.
+       Checking this directly via a file system call may lead to a time-of-check/time-of-use race condition.
+       See the pull request for REEF-1659 for discussion.
+   */
     return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
   }
 
   @Override
   public CheckpointReadChannel open(final CheckpointID id)
       throws IOException, InterruptedException {
+
     if (!(id instanceof FSCheckpointID)) {
       throw new IllegalArgumentException(
-          "Mismatched checkpoint type: " + id.getClass());
+          "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
     }
+
     return new FSCheckpointReadChannel(
         fs.open(((FSCheckpointID) id).getPath()));
   }
 
   @Override
-  public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
-      InterruptedException {
+  public CheckpointID commit(final CheckpointWriteChannel ch)
+          throws IOException, InterruptedException {
+
     if (ch.isOpen()) {
       ch.close();
     }
+
     final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
     final Path dst = hch.getDestination();
+
     if (!fs.rename(tmpfile(dst), dst)) {
       // attempt to clean up
       abort(ch);
       throw new IOException("Failed to promote checkpoint" +
           tmpfile(dst) + " -> " + dst);
     }
+
     return new FSCheckpointID(hch.getDestination());
   }
 
   @Override
   public void abort(final CheckpointWriteChannel ch) throws IOException {
+
     if (ch.isOpen()) {
       ch.close();
     }
+
     final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
     final Path tmp = tmpfile(hch.getDestination());
+
     try {
       if (!fs.delete(tmp, false)) {
-        throw new IOException("Failed to delete checkpoint during abort");
+        throw new IOException("Failed to delete a temporary checkpoint file during abort. Path: " + tmp);
       }
     } catch (final FileNotFoundException ignored) {
       // IGNORE
     }
+
   }
 
   @Override
-  public boolean delete(final CheckpointID id) throws IOException,
-      InterruptedException {
+  public boolean delete(final CheckpointID id)
+          throws IOException, InterruptedException {
+
     if (!(id instanceof FSCheckpointID)) {
       throw new IllegalArgumentException(
-          "Mismatched checkpoint type: " + id.getClass());
+              "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
     }
+
     final Path tmp = ((FSCheckpointID) id).getPath();
     try {
       return fs.delete(tmp, false);
     } catch (final FileNotFoundException ignored) {
       // IGNORE
     }
+
     return true;
   }
 
@@ -160,6 +180,7 @@ public class FSCheckpointService implements CheckpointService {
 
   private static class FSCheckpointWriteChannel
       implements CheckpointWriteChannel {
+
     private final Path finalDst;
     private final WritableByteChannel out;
     private boolean isOpen = true;

http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
index c63e054..8eb797c 100644
--- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java
@@ -17,6 +17,6 @@
  * under the License.
  */
 /**
- * FileSystem based checkpoints.
+ * Provides an example implementation of a CheckpointService based on a file system.
  */
 package org.apache.reef.io.checkpoint.fs;