You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 03:09:21 UTC

hive git commit: HIVE-15810 : llapstatus should wait for ZK node to become available when in wait mode (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master da6f581fc -> b978c074d


HIVE-15810 : llapstatus should wait for ZK node to become available when in wait mode (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b978c074
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b978c074
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b978c074

Branch: refs/heads/master
Commit: b978c074deda11f3bf70a607e25ea35ef8b914af
Parents: da6f581
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Feb 6 19:00:31 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Feb 6 19:00:31 2017 -0800

----------------------------------------------------------------------
 .../hive/llap/registry/ServiceRegistry.java     | 30 ++++-------
 .../registry/impl/LlapFixedRegistryImpl.java    |  2 +-
 .../llap/registry/impl/LlapRegistryService.java |  6 ++-
 .../impl/LlapZookeeperRegistryImpl.java         | 55 ++++++++++++++------
 .../hive/llap/cli/LlapStatusServiceDriver.java  | 11 ++--
 .../java/org/apache/hive/http/LlapServlet.java  |  2 +-
 6 files changed, 60 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 8d7fcb7..5739d72 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -24,54 +24,42 @@ public interface ServiceRegistry {
 
   /**
    * Start the service registry
-   *
-   * @throws IOException
    */
-  public void start() throws IOException;
+  void start() throws IOException;
 
   /**
    * Stop the service registry
-   *
-   * @throws IOException
    */
-  public void stop() throws IOException;
+  void stop() throws IOException;
 
   /**
    * Register the current instance - the implementation takes care of the endpoints to register.
-   *
    * @return self identifying name
-   * 
-   * @throws IOException
    */
-  public String register() throws IOException;
+  String register() throws IOException;
 
   /**
    * Remove the current registration cleanly (implementation defined cleanup)
-   *
-   * @throws IOException
    */
-  public void unregister() throws IOException;
+  void unregister() throws IOException;
 
   /**
    * Client API to get the list of instances registered via the current registry key.
-   *
    * @param component
-   * @return
-   * @throws IOException
+   * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
+   *                              started yet. 0 means do not wait.
    */
-  public ServiceInstanceSet getInstances(String component) throws IOException;
+  ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
 
   /**
    * Adds state change listeners for service instances.
-   *
    * @param listener - state change listener
-   * @throws IOException
    */
-  public void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
+  void registerStateChangeListener(ServiceInstanceStateChangeListener listener)
       throws IOException;
 
   /**
    * @return The application ID of the LLAP cluster.
    */
   ApplicationId getApplicationId() throws IOException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 45ac5bf..ebc32a1 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -258,7 +258,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
   }
 
   @Override
-  public ServiceInstanceSet getInstances(String component) throws IOException {
+  public ServiceInstanceSet getInstances(String component, long timeoutMs) throws IOException {
     return new FixedServiceInstanceSet();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 2a5afac..5a94db9 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -128,7 +128,11 @@ public class LlapRegistryService extends AbstractService {
   }
 
   public ServiceInstanceSet getInstances() throws IOException {
-    return this.registry.getInstances("LLAP");
+    return getInstances(0);
+  }
+
+  public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
+    return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
   }
 
   public void registerStateChangeListener(ServiceInstanceStateChangeListener listener)

http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index ac48b67..7ae80b0 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.KeeperException.InvalidACLException;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -781,8 +783,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   }
   
   @Override
-  public ServiceInstanceSet getInstances(String component) throws IOException {
-    checkPathChildrenCache();
+  public ServiceInstanceSet getInstances(
+      String component, long clusterReadyTimeoutMs) throws IOException {
+    checkPathChildrenCache(clusterReadyTimeoutMs);
 
     // lazily create instances
     if (instances == null) {
@@ -793,7 +796,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
   @Override
   public ApplicationId getApplicationId() throws IOException {
-    getInstances(null);
+    getInstances("LLAP", 0);
     return instances.getApplicationId();
   }
 
@@ -801,28 +804,46 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   public synchronized void registerStateChangeListener(
       final ServiceInstanceStateChangeListener listener)
       throws IOException {
-    checkPathChildrenCache();
+    checkPathChildrenCache(0);
 
     this.stateChangeListeners.add(listener);
   }
 
-  private synchronized void checkPathChildrenCache() throws IOException {
+  private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) throws IOException {
     Preconditions.checkArgument(zooKeeperClient != null &&
-            zooKeeperClient.getState() == CuratorFrameworkState.STARTED,
-        "client is not started");
-
+            zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started");
     // lazily create PathChildrenCache
-    if (instancesCache == null) {
-      this.instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
-      instancesCache.getListenable().addListener(new InstanceStateChangeListener(),
-          Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setNameFormat("StateChangeNotificationHandler")
-              .build()));
+    if (instancesCache != null) return;
+    ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+              .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
+    long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L;
+    long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs);
+    while (true) {
+      PathChildrenCache instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
+      instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp);
       try {
-        this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+        instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+        this.instancesCache = instancesCache;
+        break;
+      } catch (InvalidACLException e) {
+        // PathChildrenCache tried to mkdir when the znode wasn't there, and failed.
+        CloseableUtils.closeQuietly(instancesCache);
+        long elapsedNs = System.nanoTime() - startTimeNs;
+        if (deltaNs == 0 || deltaNs <= elapsedNs) {
+          LOG.error("Unable to start curator PathChildrenCache", e);
+          throw new IOException(e);
+        }
+        LOG.warn("The cluster is not started yet (InvalidACL); will retry");
+        try {
+          Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L));
+        } catch (InterruptedException e1) {
+          LOG.error("Interrupted while retrying the PathChildrenCache startup");
+          throw new IOException(e1);
+        }
+        sleepTimeMs = sleepTimeMs << 1;
       } catch (Exception e) {
-        LOG.error("Unable to start curator PathChildrenCache. Exception: {}", e);
+        CloseableUtils.closeQuietly(instancesCache);
+        LOG.error("Unable to start curator PathChildrenCache", e);
         throw new IOException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 39d542b..b30f837 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -174,7 +174,7 @@ public class LlapStatusServiceDriver {
     }
   }
 
-  public int run(LlapStatusOptions options) {
+  public int run(LlapStatusOptions options, long watchTimeoutMs) {
     appStatusBuilder = new AppStatusBuilder();
     try {
       if (appName == null) {
@@ -253,7 +253,7 @@ public class LlapStatusServiceDriver {
         return ret.getInt();
       } else {
         try {
-          ret = populateAppStatusFromLlapRegistry(appStatusBuilder);
+          ret = populateAppStatusFromLlapRegistry(appStatusBuilder, watchTimeoutMs);
         } catch (LlapStatusCliException e) {
           logError(e);
           return e.getExitCode().getInt();
@@ -481,7 +481,8 @@ public class LlapStatusServiceDriver {
    * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible
    * @throws LlapStatusCliException
    */
-  private ExitCode populateAppStatusFromLlapRegistry(AppStatusBuilder appStatusBuilder) throws
+  private ExitCode populateAppStatusFromLlapRegistry(
+      AppStatusBuilder appStatusBuilder, long watchTimeoutMs) throws
     LlapStatusCliException {
 
     if (llapRegistry == null) {
@@ -495,7 +496,7 @@ public class LlapStatusServiceDriver {
 
     Collection<ServiceInstance> serviceInstances;
     try {
-      serviceInstances = llapRegistry.getInstances().getAll();
+      serviceInstances = llapRegistry.getInstances(watchTimeoutMs).getAll();
     } catch (IOException e) {
       throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e);
     }
@@ -964,7 +965,7 @@ public class LlapStatusServiceDriver {
         numAttempts, watchMode, new DecimalFormat("#.###").format(runningNodesThreshold));
       while (numAttempts > 0) {
         try {
-          ret = statusServiceDriver.run(options);
+          ret = statusServiceDriver.run(options, watchMode ? watchTimeout : 0);
           if (ret == ExitCode.SUCCESS.getInt()) {
             if (watchMode) {
               currentState = statusServiceDriver.appStatusBuilder.state;

http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/service/src/java/org/apache/hive/http/LlapServlet.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/http/LlapServlet.java b/service/src/java/org/apache/hive/http/LlapServlet.java
index 993766b..1546522 100644
--- a/service/src/java/org/apache/hive/http/LlapServlet.java
+++ b/service/src/java/org/apache/hive/http/LlapServlet.java
@@ -98,7 +98,7 @@ public class LlapServlet extends HttpServlet {
 
         LOG.info("Retrieving info for cluster: " + clusterName);
         LlapStatusServiceDriver driver = new LlapStatusServiceDriver();
-        int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName));
+        int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName), 0);
         if (ret == LlapStatusServiceDriver.ExitCode.SUCCESS.getInt()) {
           driver.outputJson(writer);
         }