You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/12/06 03:06:56 UTC
nifi git commit: NIFI-3150 Added logic to wait for the zk client to
connect to the configured server
Repository: nifi
Updated Branches:
refs/heads/master 2d6bba080 -> 78de10dec
NIFI-3150 Added logic to wait for the zk client to connect to the configured server
Signed-off-by: Bryan Rosander <br...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78de10de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78de10de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78de10de
Branch: refs/heads/master
Commit: 78de10dec0127598fbbbb8ac7f7048cec3aecb6b
Parents: 2d6bba0
Author: Jeff Storck <jt...@gmail.com>
Authored: Mon Dec 5 13:19:31 2016 -0500
Committer: Bryan Rosander <br...@apache.org>
Committed: Mon Dec 5 22:06:00 2016 -0500
----------------------------------------------------------------------
.../toolkit/zkmigrator/ZooKeeperMigrator.java | 36 +++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/78de10de/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
index c15286e..c108523 100644
--- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
+++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java
@@ -27,6 +27,7 @@ import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
@@ -46,7 +47,9 @@ import java.util.List;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -132,7 +135,7 @@ class ZooKeeperMigrator {
LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()),
"Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData);
- Preconditions.checkArgument( !(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
+ Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
"Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s",
sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig);
@@ -284,14 +287,45 @@ class ZooKeeperMigrator {
}
private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException {
+ CountDownLatch connectionLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig);
+ }
+ if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+ connectionLatch.countDown();
+ }
});
+
+ final boolean connected;
+ try {
+ connected = connectionLatch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ closeZooKeeper(zooKeeper);
+ Thread.currentThread().interrupt(); // preserve interrupt
+ throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e);
+ }
+
+ if (!connected) {
+ closeZooKeeper(zooKeeper);
+ throw new IOException(String.format("unable to connect to %s, state is %s", zooKeeperEndpointConfig, zooKeeper.getState()));
+ }
+
if (authMode.equals(AuthMode.DIGEST)) {
zooKeeper.addAuthInfo(SCHEME_DIGEST, authData);
}
return zooKeeper;
}
+ private void closeZooKeeper(ZooKeeper zooKeeper) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
+ Thread.currentThread().interrupt(); // preserve interrupt
+ }
+ }
+
ZooKeeperEndpointConfig getZooKeeperEndpointConfig() {
return zooKeeperEndpointConfig;
}