You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/12/06 03:06:56 UTC

nifi git commit: NIFI-3150 Added logic to wait for the zk client to connect to the configured server

Repository: nifi
Updated Branches:
  refs/heads/master 2d6bba080 -> 78de10dec


NIFI-3150 Added logic to wait for the zk client to connect to the configured server

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78de10de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78de10de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78de10de

Branch: refs/heads/master
Commit: 78de10dec0127598fbbbb8ac7f7048cec3aecb6b
Parents: 2d6bba0
Author: Jeff Storck <jt...@gmail.com>
Authored: Mon Dec 5 13:19:31 2016 -0500
Committer: Bryan Rosander <br...@apache.org>
Committed: Mon Dec 5 22:06:00 2016 -0500

----------------------------------------------------------------------
 .../toolkit/zkmigrator/ZooKeeperMigrator.java   | 36 +++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/78de10de/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
index c15286e..c108523 100644
--- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
+++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
@@ -27,6 +27,7 @@ import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
@@ -46,7 +47,9 @@ import java.util.List;
 import java.util.Spliterators;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -132,7 +135,7 @@ class ZooKeeperMigrator {
         LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
         Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()),
                 "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData);
-        Preconditions.checkArgument(    !(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
+        Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
                 "Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s",
                 sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig);
 
@@ -284,14 +287,45 @@ class ZooKeeperMigrator {
     }
 
     private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException {
+        CountDownLatch connectionLatch = new CountDownLatch(1);
         ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig);
+            }
+            if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+                connectionLatch.countDown();
+            }
         });
+
+        final boolean connected;
+        try {
+            connected = connectionLatch.await(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            closeZooKeeper(zooKeeper);
+            Thread.currentThread().interrupt(); // preserve interrupt
+            throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e);
+        }
+
+        if (!connected) {
+            closeZooKeeper(zooKeeper);
+            throw new IOException(String.format("unable to connect to %s, state is %s", zooKeeperEndpointConfig, zooKeeper.getState()));
+        }
+
         if (authMode.equals(AuthMode.DIGEST)) {
             zooKeeper.addAuthInfo(SCHEME_DIGEST, authData);
         }
         return zooKeeper;
     }
 
+    private void closeZooKeeper(ZooKeeper zooKeeper) {
+        try {
+            zooKeeper.close();
+        } catch (InterruptedException e) {
+            LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
+            Thread.currentThread().interrupt(); // preserve interrupt
+        }
+    }
+
     ZooKeeperEndpointConfig getZooKeeperEndpointConfig() {
         return zooKeeperEndpointConfig;
     }