You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/13 02:13:14 UTC

incubator-twill git commit: (TWILL-133) Make the ZKClient stoppable even it is still trying to connect

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/twill-133 [created] a50bbb601


(TWILL-133) Make the ZKClient stoppable even it is still trying to connect

- Also include refactoring of the DefaultZKClient class to simplify
  shutdown logic

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/a50bbb60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/a50bbb60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/a50bbb60

Branch: refs/heads/feature/twill-133
Commit: a50bbb6012a45b4c0fc9aae539ea663160263a5f
Parents: 0d3c5ee
Author: Terence Yim <ch...@apache.org>
Authored: Fri Jun 12 17:13:05 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Jun 12 17:13:05 2015 -0700

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       | 115 ++++++++++++-------
 .../apache/twill/zookeeper/ZKClientTest.java    |  85 +++++++++-----
 2 files changed, 130 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/a50bbb60/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7b9b345..aeb6499 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -91,11 +90,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
                                 Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
     this.zkStr = zkStr;
     this.sessionTimeout = sessionTimeout;
-    this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+    this.connectionWatchers = new CopyOnWriteArrayList<>();
     this.authInfos = copyAuthInfo(authInfos);
     addConnectionWatcher(connectionWatcher);
 
-    this.zooKeeper = new AtomicReference<ZooKeeper>();
+    this.zooKeeper = new AtomicReference<>();
     serviceDelegate = new ServiceDelegate();
   }
 
@@ -111,7 +110,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
   }
 
   @Override
-  public Cancellable addConnectionWatcher(Watcher watcher) {
+  public Cancellable addConnectionWatcher(final Watcher watcher) {
     if (watcher == null) {
       return new Cancellable() {
         @Override
@@ -121,12 +120,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
       };
     }
 
-    final Watcher wrappedWatcher = wrapWatcher(watcher);
-    connectionWatchers.add(wrappedWatcher);
+    connectionWatchers.add(watcher);
     return new Cancellable() {
       @Override
       public void cancel() {
-        connectionWatchers.remove(wrappedWatcher);
+        connectionWatchers.remove(watcher);
       }
     };
   }
@@ -391,25 +389,47 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
   private final class ServiceDelegate extends AbstractService implements Watcher {
 
-    private final AtomicBoolean stopNotified = new AtomicBoolean(false);
-    private volatile boolean executorStopped;
+    private ServiceDelegate() {
+      addListener(new Listener() {
+        @Override
+        public void starting() {
+          // no-op
+        }
 
-    @Override
-    protected void doStart() {
-      // A single thread executor
-      eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-                                             Threads.createDaemonThreadFactory("zk-client-EventThread")) {
         @Override
-        protected void terminated() {
-          super.terminated();
+        public void running() {
+          // no-op
+        }
 
-          // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur.
-          // Also, notifyStopped() should only be called once.
-          if (executorStopped && stopNotified.compareAndSet(false, true)) {
-            notifyStopped();
+        @Override
+        public void stopping(State from) {
+          if (from == State.STARTING) {
+            // If it is still starting, just notify that it's started to transit out of the STARTING phase.
+            notifyStarted();
           }
         }
-      };
+
+        @Override
+        public void terminated(State from) {
+          // no-op
+        }
+
+        @Override
+        public void failed(State from, Throwable failure) {
+          closeZooKeeper(zooKeeper.getAndSet(null));
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+
+    @Override
+    protected void doStart() {
+      // A single thread executor for all events
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                                                           new LinkedBlockingQueue<Runnable>(),
+                                                           Threads.createDaemonThreadFactory("zk-client-EventThread"));
+      // Just discard the execution if the executor is closed
+      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+      eventExecutor = executor;
 
       try {
         zooKeeper.set(createZooKeeper());
@@ -420,23 +440,18 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
     @Override
     protected void doStop() {
-      ZooKeeper zk = zooKeeper.getAndSet(null);
-      if (zk != null) {
-        try {
-          zk.close();
-        } catch (InterruptedException e) {
-          notifyFailed(e);
-        } finally {
-          eventExecutor.shutdown();
-          executorStopped = true;
-
-          // If the executor state is terminated, meaning the terminate() method is triggered,
-          // call notifyStopped() if it hasn't been called yet.
-          if (eventExecutor.isTerminated() && stopNotified.compareAndSet(false, true)) {
+      eventExecutor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            closeZooKeeper(zooKeeper.getAndSet(null));
             notifyStopped();
+          } catch (Exception e) {
+            notifyFailed(e);
           }
         }
-      }
+      });
+      eventExecutor.shutdown();
     }
 
     @Override
@@ -451,23 +466,23 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
           LOG.info("ZooKeeper session expired: {}", zkStr);
 
           // When connection expired, simply reconnect again
-          Thread t = new Thread(new Runnable() {
+          eventExecutor.submit(new Runnable() {
             @Override
             public void run() {
+              if (state() != State.RUNNING) {
+                return;
+              }
               try {
                 LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
-                zooKeeper.set(createZooKeeper());
+                closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
               } catch (IOException e) {
-                zooKeeper.set(null);
                 notifyFailed(e);
               }
             }
-          }, "zk-reconnect");
-          t.setDaemon(true);
-          t.start();
+          });
         }
       } finally {
-        if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+        if (event.getType() == Event.EventType.None) {
           for (Watcher connectionWatcher : connectionWatchers) {
             connectionWatcher.process(event);
           }
@@ -479,12 +494,26 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
      * Creates a new ZooKeeper connection.
      */
     private ZooKeeper createZooKeeper() throws IOException {
-      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
       for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
         zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
       }
       return zk;
     }
+
+    /**
+     * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+     * it will get logged.
+     */
+    private void closeZooKeeper(@Nullable ZooKeeper zk) {
+      try {
+        if (zk != null) {
+          zk.close();
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when closing ZooKeeper", e);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/a50bbb60/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 97dec03..9c7e788 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.List;
@@ -235,38 +236,40 @@ public class ZKClientTest {
         }
       }
     }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
-    client.startAndWait();
-
-    zkServer.stopAndWait();
-
-    Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
 
     final CountDownLatch createLatch = new CountDownLatch(1);
-    Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        createLatch.countDown();
-      }
+    client.startAndWait();
+    try {
+      zkServer.stopAndWait();
 
-      @Override
-      public void onFailure(Throwable t) {
-        t.printStackTrace(System.out);
-      }
-    });
-
-    TimeUnit.SECONDS.sleep(2);
-    zkServer = InMemoryZKServer.builder()
-                               .setDataDir(dataDir)
-                               .setAutoCleanDataDir(true)
-                               .setPort(port)
-                               .setTickTime(1000)
-                               .build();
-    zkServer.startAndWait();
+      Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+      Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+        @Override
+        public void onSuccess(String result) {
+          createLatch.countDown();
+        }
 
-    try {
-      Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+        @Override
+        public void onFailure(Throwable t) {
+          t.printStackTrace(System.out);
+        }
+      });
+
+      TimeUnit.SECONDS.sleep(2);
+      zkServer = InMemoryZKServer.builder()
+                                 .setDataDir(dataDir)
+                                 .setAutoCleanDataDir(true)
+                                 .setPort(port)
+                                 .setTickTime(1000)
+                                 .build();
+      zkServer.startAndWait();
+      try {
+        Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+      } finally {
+        zkServer.stopAndWait();
+      }
     } finally {
-      zkServer.stopAndWait();
+      client.stopAndWait();
     }
   }
 
@@ -353,4 +356,32 @@ public class ZKClientTest {
       zkServer.stopAndWait();
     }
   }
+
+  @Test
+  public void testStop() throws IOException, InterruptedException, ExecutionException {
+    try (final ServerSocket serverSocket = new ServerSocket(0)){
+      // A latch to make sure at least one connection attempt from the zk client has been made
+      final CountDownLatch connectLatch = new CountDownLatch(1);
+      Thread serverThread = new Thread() {
+        public void run() {
+          try {
+            while (!interrupted()) {
+              serverSocket.accept().close();
+              connectLatch.countDown();
+            }
+          } catch (Exception e) {
+            // no-op
+          }
+        }
+      };
+      serverThread.start();
+
+      ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build();
+      zkClient.start();
+      Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS));
+
+      zkClient.stopAndWait();
+      serverThread.interrupt();
+    }
+  }
 }