You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 03:09:21 UTC
hive git commit: HIVE-15810 : llapstatus should wait for ZK node to
become available when in wait mode (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master da6f581fc -> b978c074d
HIVE-15810 : llapstatus should wait for ZK node to become available when in wait mode (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b978c074
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b978c074
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b978c074
Branch: refs/heads/master
Commit: b978c074deda11f3bf70a607e25ea35ef8b914af
Parents: da6f581
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Feb 6 19:00:31 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Feb 6 19:00:31 2017 -0800
----------------------------------------------------------------------
.../hive/llap/registry/ServiceRegistry.java | 30 ++++-------
.../registry/impl/LlapFixedRegistryImpl.java | 2 +-
.../llap/registry/impl/LlapRegistryService.java | 6 ++-
.../impl/LlapZookeeperRegistryImpl.java | 55 ++++++++++++++------
.../hive/llap/cli/LlapStatusServiceDriver.java | 11 ++--
.../java/org/apache/hive/http/LlapServlet.java | 2 +-
6 files changed, 60 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 8d7fcb7..5739d72 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -24,54 +24,42 @@ public interface ServiceRegistry {
/**
* Start the service registry
- *
- * @throws IOException
*/
- public void start() throws IOException;
+ void start() throws IOException;
/**
* Stop the service registry
- *
- * @throws IOException
*/
- public void stop() throws IOException;
+ void stop() throws IOException;
/**
* Register the current instance - the implementation takes care of the endpoints to register.
- *
* @return self identifying name
- *
- * @throws IOException
*/
- public String register() throws IOException;
+ String register() throws IOException;
/**
* Remove the current registration cleanly (implementation defined cleanup)
- *
- * @throws IOException
*/
- public void unregister() throws IOException;
+ void unregister() throws IOException;
/**
* Client API to get the list of instances registered via the current registry key.
- *
* @param component
- * @return
- * @throws IOException
+ * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
+ * started yet. 0 means do not wait.
*/
- public ServiceInstanceSet getInstances(String component) throws IOException;
+ ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
/**
* Adds state change listeners for service instances.
- *
* @param listener - state change listener
- * @throws IOException
*/
- public void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
+ void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
throws IOException;
/**
* @return The application ID of the LLAP cluster.
*/
ApplicationId getApplicationId() throws IOException;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 45ac5bf..ebc32a1 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -258,7 +258,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public ServiceInstanceSet getInstances(String component) throws IOException {
+ public ServiceInstanceSet getInstances(String component, long timeoutMs) throws IOException {
return new FixedServiceInstanceSet();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 2a5afac..5a94db9 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -128,7 +128,11 @@ public class LlapRegistryService extends AbstractService {
}
public ServiceInstanceSet getInstances() throws IOException {
- return this.registry.getInstances("LLAP");
+ return getInstances(0);
+ }
+
+ public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
+ return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
}
public void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index ac48b67..7ae80b0 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.KeeperException.InvalidACLException;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -781,8 +783,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
@Override
- public ServiceInstanceSet getInstances(String component) throws IOException {
- checkPathChildrenCache();
+ public ServiceInstanceSet getInstances(
+ String component, long clusterReadyTimeoutMs) throws IOException {
+ checkPathChildrenCache(clusterReadyTimeoutMs);
// lazily create instances
if (instances == null) {
@@ -793,7 +796,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@Override
public ApplicationId getApplicationId() throws IOException {
- getInstances(null);
+ getInstances("LLAP", 0);
return instances.getApplicationId();
}
@@ -801,28 +804,46 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
public synchronized void registerStateChangeListener(
final ServiceInstanceStateChangeListener listener)
throws IOException {
- checkPathChildrenCache();
+ checkPathChildrenCache(0);
this.stateChangeListeners.add(listener);
}
- private synchronized void checkPathChildrenCache() throws IOException {
+ private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) throws IOException {
Preconditions.checkArgument(zooKeeperClient != null &&
- zooKeeperClient.getState() == CuratorFrameworkState.STARTED,
- "client is not started");
-
+ zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started");
// lazily create PathChildrenCache
- if (instancesCache == null) {
- this.instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
- instancesCache.getListenable().addListener(new InstanceStateChangeListener(),
- Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("StateChangeNotificationHandler")
- .build()));
+ if (instancesCache != null) return;
+ ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
+ long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L;
+ long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs);
+ while (true) {
+ PathChildrenCache instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
+ instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp);
try {
- this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ this.instancesCache = instancesCache;
+ break;
+ } catch (InvalidACLException e) {
+ // PathChildrenCache tried to mkdir when the znode wasn't there, and failed.
+ CloseableUtils.closeQuietly(instancesCache);
+ long elapsedNs = System.nanoTime() - startTimeNs;
+ if (deltaNs == 0 || deltaNs <= elapsedNs) {
+ LOG.error("Unable to start curator PathChildrenCache", e);
+ throw new IOException(e);
+ }
+ LOG.warn("The cluster is not started yet (InvalidACL); will retry");
+ try {
+ Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L));
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted while retrying the PathChildrenCache startup");
+ throw new IOException(e1);
+ }
+ sleepTimeMs = sleepTimeMs << 1;
} catch (Exception e) {
- LOG.error("Unable to start curator PathChildrenCache. Exception: {}", e);
+ CloseableUtils.closeQuietly(instancesCache);
+ LOG.error("Unable to start curator PathChildrenCache", e);
throw new IOException(e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 39d542b..b30f837 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -174,7 +174,7 @@ public class LlapStatusServiceDriver {
}
}
- public int run(LlapStatusOptions options) {
+ public int run(LlapStatusOptions options, long watchTimeoutMs) {
appStatusBuilder = new AppStatusBuilder();
try {
if (appName == null) {
@@ -253,7 +253,7 @@ public class LlapStatusServiceDriver {
return ret.getInt();
} else {
try {
- ret = populateAppStatusFromLlapRegistry(appStatusBuilder);
+ ret = populateAppStatusFromLlapRegistry(appStatusBuilder, watchTimeoutMs);
} catch (LlapStatusCliException e) {
logError(e);
return e.getExitCode().getInt();
@@ -481,7 +481,8 @@ public class LlapStatusServiceDriver {
* @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
* @throws LlapStatusCliException
*/
- private ExitCode populateAppStatusFromLlapRegistry(AppStatusBuilder appStatusBuilder) throws
+ private ExitCode populateAppStatusFromLlapRegistry(
+ AppStatusBuilder appStatusBuilder, long watchTimeoutMs) throws
LlapStatusCliException {
if (llapRegistry == null) {
@@ -495,7 +496,7 @@ public class LlapStatusServiceDriver {
Collection<ServiceInstance> serviceInstances;
try {
- serviceInstances = llapRegistry.getInstances().getAll();
+ serviceInstances = llapRegistry.getInstances(watchTimeoutMs).getAll();
} catch (IOException e) {
throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e);
}
@@ -964,7 +965,7 @@ public class LlapStatusServiceDriver {
numAttempts, watchMode, new DecimalFormat("#.###").format(runningNodesThreshold));
while (numAttempts > 0) {
try {
- ret = statusServiceDriver.run(options);
+ ret = statusServiceDriver.run(options, watchMode ? watchTimeout : 0);
if (ret == ExitCode.SUCCESS.getInt()) {
if (watchMode) {
currentState = statusServiceDriver.appStatusBuilder.state;
http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/service/src/java/org/apache/hive/http/LlapServlet.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/http/LlapServlet.java b/service/src/java/org/apache/hive/http/LlapServlet.java
index 993766b..1546522 100644
--- a/service/src/java/org/apache/hive/http/LlapServlet.java
+++ b/service/src/java/org/apache/hive/http/LlapServlet.java
@@ -98,7 +98,7 @@ public class LlapServlet extends HttpServlet {
LOG.info("Retrieving info for cluster: " + clusterName);
LlapStatusServiceDriver driver = new LlapStatusServiceDriver();
- int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName));
+ int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName), 0);
if (ret == LlapStatusServiceDriver.ExitCode.SUCCESS.getInt()) {
driver.outputJson(writer);
}