You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/01/16 21:40:23 UTC

git commit: Currently, any messages and state for a runnable instance is cleaned up by the instance during start up. This is to make sure things are clean if the instance is being restarted.

Updated Branches:
  refs/heads/master c019131d7 -> 1380f21c0


Currently, any messages and state for a runnable instance is cleaned up by the instance during start up. This is to make sure things are clean if the instance is being restarted.

However, this can result in lost messages to a new instance if a
message is sent after the container has been assigned to a new instance,
but before the cleanup part of the code runs.  Moving the cleanup to the
 application master when it is launching to fix this problem.

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/1380f21c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/1380f21c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/1380f21c

Branch: refs/heads/master
Commit: 1380f21c0516d6b4655adb75206e27b5e2166d69
Parents: c019131
Author: Albert Shau <al...@continuuity.com>
Authored: Thu Jan 16 12:17:38 2014 -0800
Committer: Terence Yim <te...@continuuity.com>
Committed: Thu Jan 16 12:39:51 2014 -0800

----------------------------------------------------------------------
 .../twill/internal/TwillContainerLauncher.java   |  7 +++++++
 .../twill/internal/ZKServiceDecorator.java       | 19 +++----------------
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1380f21c/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 63f8732..dad8cbe 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal;
 
+import com.google.common.util.concurrent.Futures;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
@@ -27,6 +28,8 @@ import org.apache.twill.launcher.TwillLauncher;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.ZKClient;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +68,10 @@ public final class TwillContainerLauncher {
     ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
     ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
 
+    // Clean up zookeeper path in case this is a retry and there are old messages and state there.
+    Futures.getUnchecked(ZKOperations.ignoreError(
+      ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null));
+
     // Adds all file to be localized to container
     if (!runtimeSpec.getLocalFiles().isEmpty()) {
       resourcesAdder = launchContext.withResources();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1380f21c/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
index 7313d33..d434bac 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
@@ -107,20 +107,6 @@ public final class ZKServiceDecorator extends AbstractService {
     }
   }
 
-  /**
-   * Deletes the given ZK path recursively and create the path again.
-   */
-  private ListenableFuture<String> deleteAndCreate(final String path, final byte[] data, final CreateMode mode) {
-    return Futures.transform(ZKOperations.ignoreError(ZKOperations.recursiveDelete(zkClient, path),
-                                                      KeeperException.NoNodeException.class, null),
-                             new AsyncFunction<String, String>() {
-      @Override
-      public ListenableFuture<String> apply(String input) throws Exception {
-        return zkClient.create(path, data, mode);
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
   @Override
   protected void doStart() {
     callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
@@ -131,8 +117,9 @@ public final class ZKServiceDecorator extends AbstractService {
         StateNode stateNode = new StateNode(ServiceController.State.STARTING);
 
         final ListenableFuture<List<String>> createFuture = Futures.allAsList(
-          deleteAndCreate(getZKPath("messages"), null, CreateMode.PERSISTENT),
-          deleteAndCreate(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
+          ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
+                                   KeeperException.NodeExistsException.class, null),
+          zkClient.create(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
         );
 
         createFuture.addListener(new Runnable() {