You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/15 19:22:26 UTC

[1/2] incubator-twill git commit: (TWILL-133) Make the ZKClient stoppable even it is still trying to connect

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/twill-133 [created] 74720b804


(TWILL-133) Make the ZKClient stoppable even it is still trying to connect

- Also include refactoring of the DefaultZKClient class to simplify
  shutdown logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/bda99086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/bda99086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/bda99086

Branch: refs/heads/feature/twill-133
Commit: bda99086633241bea8c999ec7833eddd01a9e341
Parents: 0d3c5ee
Author: Terence Yim <ch...@apache.org>
Authored: Fri Jun 12 17:13:05 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Jun 12 17:21:35 2015 -0700

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       | 119 ++++++++++++-------
 .../apache/twill/zookeeper/ZKClientTest.java    |  85 ++++++++-----
 2 files changed, 134 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/bda99086/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7b9b345..71a0118 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -91,11 +90,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
                                 Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
     this.zkStr = zkStr;
     this.sessionTimeout = sessionTimeout;
-    this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+    this.connectionWatchers = new CopyOnWriteArrayList<>();
     this.authInfos = copyAuthInfo(authInfos);
     addConnectionWatcher(connectionWatcher);
 
-    this.zooKeeper = new AtomicReference<ZooKeeper>();
+    this.zooKeeper = new AtomicReference<>();
     serviceDelegate = new ServiceDelegate();
   }
 
@@ -111,7 +110,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
   }
 
   @Override
-  public Cancellable addConnectionWatcher(Watcher watcher) {
+  public Cancellable addConnectionWatcher(final Watcher watcher) {
     if (watcher == null) {
       return new Cancellable() {
         @Override
@@ -121,12 +120,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
       };
     }
 
-    final Watcher wrappedWatcher = wrapWatcher(watcher);
-    connectionWatchers.add(wrappedWatcher);
+    // Invocation of connection watchers are already done inside the event thread,
+    // hence no need to wrap the watcher again.
+    connectionWatchers.add(watcher);
     return new Cancellable() {
       @Override
       public void cancel() {
-        connectionWatchers.remove(wrappedWatcher);
+        connectionWatchers.remove(watcher);
       }
     };
   }
@@ -391,25 +391,49 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
   private final class ServiceDelegate extends AbstractService implements Watcher {
 
-    private final AtomicBoolean stopNotified = new AtomicBoolean(false);
-    private volatile boolean executorStopped;
+    private ServiceDelegate() {
+      // Add a listener for state changes so that we can terminate the service even it is in STARTING state upoon
+      // stop is requested
+      addListener(new Listener() {
+        @Override
+        public void starting() {
+          // no-op
+        }
 
-    @Override
-    protected void doStart() {
-      // A single thread executor
-      eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-                                             Threads.createDaemonThreadFactory("zk-client-EventThread")) {
         @Override
-        protected void terminated() {
-          super.terminated();
+        public void running() {
+          // no-op
+        }
 
-          // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur.
-          // Also, notifyStopped() should only be called once.
-          if (executorStopped && stopNotified.compareAndSet(false, true)) {
-            notifyStopped();
+        @Override
+        public void stopping(State from) {
+          if (from == State.STARTING) {
+            // If it is still starting, just notify that it's started to transit out of the STARTING phase.
+            notifyStarted();
           }
         }
-      };
+
+        @Override
+        public void terminated(State from) {
+          // no-op
+        }
+
+        @Override
+        public void failed(State from, Throwable failure) {
+          closeZooKeeper(zooKeeper.getAndSet(null));
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+
+    @Override
+    protected void doStart() {
+      // A single thread executor for all events
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                                                           new LinkedBlockingQueue<Runnable>(),
+                                                           Threads.createDaemonThreadFactory("zk-client-EventThread"));
+      // Just discard the execution if the executor is closed
+      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+      eventExecutor = executor;
 
       try {
         zooKeeper.set(createZooKeeper());
@@ -420,23 +444,18 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
     @Override
     protected void doStop() {
-      ZooKeeper zk = zooKeeper.getAndSet(null);
-      if (zk != null) {
-        try {
-          zk.close();
-        } catch (InterruptedException e) {
-          notifyFailed(e);
-        } finally {
-          eventExecutor.shutdown();
-          executorStopped = true;
-
-          // If the executor state is terminated, meaning the terminate() method is triggered,
-          // call notifyStopped() if it hasn't been called yet.
-          if (eventExecutor.isTerminated() && stopNotified.compareAndSet(false, true)) {
+      eventExecutor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            closeZooKeeper(zooKeeper.getAndSet(null));
             notifyStopped();
+          } catch (Exception e) {
+            notifyFailed(e);
           }
         }
-      }
+      });
+      eventExecutor.shutdown();
     }
 
     @Override
@@ -451,23 +470,23 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
           LOG.info("ZooKeeper session expired: {}", zkStr);
 
           // When connection expired, simply reconnect again
-          Thread t = new Thread(new Runnable() {
+          eventExecutor.submit(new Runnable() {
             @Override
             public void run() {
+              if (state() != State.RUNNING) {
+                return;
+              }
               try {
                 LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
-                zooKeeper.set(createZooKeeper());
+                closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
               } catch (IOException e) {
-                zooKeeper.set(null);
                 notifyFailed(e);
               }
             }
-          }, "zk-reconnect");
-          t.setDaemon(true);
-          t.start();
+          });
         }
       } finally {
-        if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+        if (event.getType() == Event.EventType.None) {
           for (Watcher connectionWatcher : connectionWatchers) {
             connectionWatcher.process(event);
           }
@@ -479,12 +498,26 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
      * Creates a new ZooKeeper connection.
      */
     private ZooKeeper createZooKeeper() throws IOException {
-      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
       for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
         zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
       }
       return zk;
     }
+
+    /**
+     * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+     * it will get logged.
+     */
+    private void closeZooKeeper(@Nullable ZooKeeper zk) {
+      try {
+        if (zk != null) {
+          zk.close();
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when closing ZooKeeper", e);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/bda99086/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 97dec03..162d4db 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.List;
@@ -235,38 +236,40 @@ public class ZKClientTest {
         }
       }
     }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
-    client.startAndWait();
-
-    zkServer.stopAndWait();
-
-    Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
 
     final CountDownLatch createLatch = new CountDownLatch(1);
-    Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        createLatch.countDown();
-      }
+    client.startAndWait();
+    try {
+      zkServer.stopAndWait();
 
-      @Override
-      public void onFailure(Throwable t) {
-        t.printStackTrace(System.out);
-      }
-    });
-
-    TimeUnit.SECONDS.sleep(2);
-    zkServer = InMemoryZKServer.builder()
-                               .setDataDir(dataDir)
-                               .setAutoCleanDataDir(true)
-                               .setPort(port)
-                               .setTickTime(1000)
-                               .build();
-    zkServer.startAndWait();
+      Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+      Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+        @Override
+        public void onSuccess(String result) {
+          createLatch.countDown();
+        }
 
-    try {
-      Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+        @Override
+        public void onFailure(Throwable t) {
+          t.printStackTrace(System.out);
+        }
+      });
+
+      TimeUnit.SECONDS.sleep(2);
+      zkServer = InMemoryZKServer.builder()
+                                 .setDataDir(dataDir)
+                                 .setAutoCleanDataDir(true)
+                                 .setPort(port)
+                                 .setTickTime(1000)
+                                 .build();
+      zkServer.startAndWait();
+      try {
+        Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+      } finally {
+        zkServer.stopAndWait();
+      }
     } finally {
-      zkServer.stopAndWait();
+      client.stopAndWait();
     }
   }
 
@@ -353,4 +356,32 @@ public class ZKClientTest {
       zkServer.stopAndWait();
     }
   }
+
+  @Test
+  public void testStop() throws IOException, InterruptedException, ExecutionException {
+    try (final ServerSocket serverSocket = new ServerSocket(0)) {
+      // A latch to make sure at least one connection attempt from the zk client has been made
+      final CountDownLatch connectLatch = new CountDownLatch(1);
+      Thread serverThread = new Thread() {
+        public void run() {
+          try {
+            while (!interrupted()) {
+              serverSocket.accept().close();
+              connectLatch.countDown();
+            }
+          } catch (Exception e) {
+            // no-op
+          }
+        }
+      };
+      serverThread.start();
+
+      ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build();
+      zkClient.start();
+      Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS));
+
+      zkClient.stopAndWait();
+      serverThread.interrupt();
+    }
+  }
 }


[2/2] incubator-twill git commit: Better state handling and more comments to explain some of the race conditions may arise during stopping of the ZKClient.

Posted by ch...@apache.org.
Better state handling and more comments to explain some of the race conditions may arise during stopping of the ZKClient.

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/74720b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/74720b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/74720b80

Branch: refs/heads/feature/twill-133
Commit: 74720b804e5216e106b2d0f1e9cf1dbbb2d6282b
Parents: bda9908
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jun 15 10:22:17 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jun 15 10:22:17 2015 -0700

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       | 57 +++++++++++++++-----
 1 file changed, 45 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/74720b80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 71a0118..6fae663 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -391,7 +391,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
   private final class ServiceDelegate extends AbstractService implements Watcher {
 
+    private final Runnable stopTask;
+
     private ServiceDelegate() {
+      // Creates the stop task runnable in constructor so that if the stop() method is called from shutdown hook,
+      // it won't fail with class loading error due to failure to load inner class from the shutdown thread.
+      this.stopTask = createStopTask();
+
       // Add a listener for state changes so that we can terminate the service even it is in STARTING state upoon
       // stop is requested
       addListener(new Listener() {
@@ -420,6 +426,8 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
         @Override
         public void failed(State from, Throwable failure) {
+          eventExecutor.shutdownNow();
+          // Close the ZK client if there is exception. It is needed because the stop task may not get executed
           closeZooKeeper(zooKeeper.getAndSet(null));
         }
       }, Threads.SAME_THREAD_EXECUTOR);
@@ -444,24 +452,21 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
     @Override
     protected void doStop() {
-      eventExecutor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            closeZooKeeper(zooKeeper.getAndSet(null));
-            notifyStopped();
-          } catch (Exception e) {
-            notifyFailed(e);
-          }
-        }
-      });
+      // Submit a task to the executor to make sure all pending events in the executor are fired before
+      // transiting this Service into STOPPED state
+      eventExecutor.submit(stopTask);
       eventExecutor.shutdown();
     }
 
     @Override
     public void process(WatchedEvent event) {
+      State state = state();
+      if (state == State.TERMINATED || state == State.FAILED) {
+        return;
+      }
+
       try {
-        if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
+        if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
           LOG.debug("Connected to ZooKeeper: {}", zkStr);
           notifyStarted();
           return;
@@ -473,6 +478,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
           eventExecutor.submit(new Runnable() {
             @Override
             public void run() {
+              // Only reconnect if the current state is running
               if (state() != State.RUNNING) {
                 return;
               }
@@ -494,6 +500,33 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
       }
     }
 
+
+    /**
+     * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
+     * Service into STOPPED state.
+     */
+    private Runnable createStopTask() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // Close the ZK connection in this task will make sure if there is ZK connection created
+            // after doStop() was called but before this task has been executed is also closed.
+            // It is possible to happen when the following sequence happens:
+            //
+            // 1. session expired, hence the expired event is triggered
+            // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
+            // 3. Service.stop() gets called, Service.state() changed to STOPPING
+            // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
+            closeZooKeeper(zooKeeper.getAndSet(null));
+            notifyStopped();
+          } catch (Exception e) {
+            notifyFailed(e);
+          }
+        }
+      };
+    }
+
     /**
      * Creates a new ZooKeeper connection.
      */