You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/07/11 06:17:55 UTC

[15/50] [abbrv] git commit: SLIDER-134. Provide a default ZK node for apps

SLIDER-134. Provide a default ZK node for apps


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/8f933709
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/8f933709
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/8f933709

Branch: refs/heads/master
Commit: 8f933709d42034a541609874aaf3423f4b8904a8
Parents: a613e20
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Sun Jul 6 22:25:15 2014 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Mon Jul 7 07:49:32 2014 -0700

----------------------------------------------------------------------
 app-packages/hbase/appConfig.json               |   1 +
 .../org/apache/slider/client/SliderClient.java  | 112 ++++++++++++++++++-
 .../slider/core/zk/BlockingZKWatcher.java       |   8 +-
 .../apache/slider/core/zk/ZKIntegration.java    |  23 +++-
 .../slider/providers/agent/AgentKeys.java       |   1 +
 .../providers/agent/AgentProviderService.java   |   1 +
 .../common/tools/TestZKIntegration.groovy       |  53 +++++++++
 7 files changed, 191 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/app-packages/hbase/appConfig.json
----------------------------------------------------------------------
diff --git a/app-packages/hbase/appConfig.json b/app-packages/hbase/appConfig.json
index 9a43863..81fcf2c 100644
--- a/app-packages/hbase/appConfig.json
+++ b/app-packages/hbase/appConfig.json
@@ -5,6 +5,7 @@
   "global": {
     "agent.conf": "/slider/agent/conf/agent.ini",
     "application.def": "/slider/${app.package.name}.zip",
+    "create.default.zookeeper.node": "true",
     "config_types": "core-site,hdfs-site,hbase-site",
     "java_home": "/usr/jdk64/jdk1.7.0_45",
     "package_list": "files/hbase-${hbase.version}-bin.tar.gz",

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index c35a60a..286ebac 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -94,6 +94,8 @@ import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
 import org.apache.slider.core.registry.info.RegisteredEndpoint;
 import org.apache.slider.core.registry.info.ServiceInstanceData;
 import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
 import org.apache.slider.core.zk.ZKPathBuilder;
 import org.apache.slider.providers.AbstractClientProvider;
 import org.apache.slider.providers.SliderProviderFactory;
@@ -106,6 +108,10 @@ import org.apache.slider.server.services.registry.SliderRegistryService;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
 
 import static org.apache.slider.common.params.SliderActions.*;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,6 +274,84 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
     return exitCode;
   }
 
+  /**
+   * Delete the zookeeper node associated with the calling user and the cluster
+   **/
+  protected boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    try {
+      Configuration config = getConfig();
+      if (!SliderUtils.isHadoopClusterSecure(config)) {
+        ZKIntegration client = getZkClient(clusterName, user);
+        if (client != null) {
+          if (client.exists(zkPath)) {
+            log.info("Deleting zookeeper path {}", zkPath);
+          }
+          client.deleteRecursive(zkPath);
+          return true;
+        }
+      } else {
+        log.warn("Default zookeeper node is not available for secure cluster");
+      }
+    } catch (InterruptedException e) {
+      log.warn("Unable to recursively delete zk node {}", zkPath, e);
+    } catch (KeeperException e) {
+      log.warn("Unable to recursively delete zk node {}", zkPath, e);
+    } catch (BadConfigException e) {
+      log.warn("Unable to recursively delete zk node {}", zkPath, e);
+    }
+
+    return false;
+  }
+
+  /**
+   * Create the zookeeper node associated with the calling user and the cluster
+   */
+  protected String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+    String user = getUsername();
+    String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+    if(nameOnly) {
+      return zkPath;
+    }
+    Configuration config = getConfig();
+    if (!SliderUtils.isHadoopClusterSecure(config)) {
+      ZKIntegration client = getZkClient(clusterName, user);
+      if (client != null) {
+        try {
+          client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+          return zkPath;
+        } catch (InterruptedException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        } catch (KeeperException e) {
+          log.warn("Unable to create zk node {}", zkPath, e);
+        }
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+   **/
+  protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+    String registryQuorum = lookupZKQuorum();
+    ZKIntegration client = null;
+    try {
+      BlockingZKWatcher watcher = new BlockingZKWatcher();
+      client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
+      client.init();
+      watcher.waitForZKConnection(2 * 1000);
+    } catch (InterruptedException e) {
+      client = null;
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    } catch (IOException e) {
+      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+    }
+    return client;
+  }
 
   /**
    * Destroy a cluster. There's two race conditions here
@@ -297,6 +381,10 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
       log.warn("Filesystem returned false from delete() operation");
     }
 
+    if(!deleteZookeeperNode(clustername)) {
+      log.warn("Unable to perform node cleanup in Zookeeper.");
+    }
+
     List<ApplicationReport> instances = findAllLiveInstances(clustername);
     // detect any race leading to cluster creation during the check/destroy process
     // and report a problem.
@@ -382,7 +470,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
    */
   
   public void buildInstanceDefinition(String clustername,
-                                         AbstractClusterBuildingActionArgs buildInfo)
+                                      AbstractClusterBuildingActionArgs buildInfo)
         throws YarnException, IOException {
     // verify that a live cluster isn't there
     SliderUtils.validateClusterName(clustername);
@@ -498,11 +586,25 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
         registryQuorum,
         quorum);
     String zookeeperRoot = buildInfo.getAppZKPath();
-    
+
     if (isSet(zookeeperRoot)) {
       zkPaths.setAppPath(zookeeperRoot);
-      
+    } else {
+      String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+      if (createDefaultZkNode.equals("true")) {
+        String defaultZKPath = createZookeeperNode(clustername, false);
+        log.info("ZK node created for application instance: {}.", defaultZKPath);
+        if (defaultZKPath != null) {
+          zkPaths.setAppPath(defaultZKPath);
+        }
+      } else {
+        // create AppPath if default is being used
+        String defaultZKPath = createZookeeperNode(clustername, true);
+        log.info("ZK node assigned to application instance: {}.", defaultZKPath);
+        zkPaths.setAppPath(defaultZKPath);
+      }
     }
+
     builder.addZKBinding(zkPaths);
 
     //then propagate any package URI
@@ -646,8 +748,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
    */
   public LaunchedApplication launchApplication(String clustername,
                                                Path clusterDirectory,
-                               AggregateConf instanceDefinition,
-                               boolean debugAM)
+                                               AggregateConf instanceDefinition,
+                                               boolean debugAM)
     throws YarnException, IOException {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
index 62ebff3..ca49888 100644
--- a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.ConnectException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BlockingZKWatcher implements Watcher {
@@ -49,7 +50,8 @@ public class BlockingZKWatcher implements Watcher {
    * @param timeout timeout in millis
    */
 
-  public void waitForZKConnection(int timeout) throws InterruptedException {
+  public void waitForZKConnection(int timeout)
+      throws InterruptedException, ConnectException {
     synchronized (connectedFlag) {
       if (!connectedFlag.get()) {
         log.info("waiting for ZK event");
@@ -57,7 +59,9 @@ public class BlockingZKWatcher implements Watcher {
         connectedFlag.wait(timeout);
       }
     }
-    assert connectedFlag.get();
+    if (!connectedFlag.get()) {
+      throw new ConnectException("Unable to connect to ZK quorum");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
index 6270123..54aeb4f 100644
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -250,13 +250,34 @@ public class ZKIntegration implements Watcher {
                                      KeeperException {
     try {
       zookeeper.delete(path, -1);
+      log.debug("Deleting {}", path);
       return true;
     } catch (KeeperException.NoNodeException ignored) {
       return false;
     }
   }
 
-/**
+  /**
+   * Recursively delete a node, does not throw exception if any node does not exist.
+   * @param path
+   * @return true if delete was successful
+   */
+  public boolean deleteRecursive(String path) throws KeeperException, InterruptedException {
+
+    try {
+      List<String> children = zookeeper.getChildren(path, false);
+      for (String child : children) {
+        deleteRecursive(path + "/" + child);
+      }
+      delete(path);
+    } catch (KeeperException.NoNodeException ignored) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
  * Build the path to a cluster; exists once the cluster has come up.
  * Even before that, a ZK watcher could wait for it.
  * @param username user

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 333058d..019a8e6 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -88,6 +88,7 @@ public interface AgentKeys {
   String PACKAGE_LIST = "package_list";
   String WAIT_HEARTBEAT = "wait.heartbeat";
   String PYTHON_EXE = "python";
+  String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node";
   String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval";
   String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data";
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 85aa8db..6cd3d9e 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -898,6 +898,7 @@ public class AgentProviderService extends AbstractProviderService implements
     tokens.put("${NN_URI}", nnuri);
     tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
     tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
+    tokens.put("${DEF_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
     tokens.put("${DEFAULT_DATA_DIR}", getAmState()
         .getInternalsSnapshot()
         .getGlobalOptions()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
index 3930864..fe3bef7 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
@@ -20,6 +20,7 @@ package org.apache.slider.common.tools
 
 import groovy.util.logging.Slf4j
 import org.apache.hadoop.conf.Configuration
+import org.apache.slider.client.SliderClient
 import org.apache.slider.core.zk.ZKIntegration
 import org.apache.slider.test.KeysForTests
 import org.apache.slider.test.YarnZKMiniClusterTestBase
@@ -88,10 +89,62 @@ class TestZKIntegration extends YarnZKMiniClusterTestBase implements KeysForTest
            (c1.endsWith(clusters[1]) && c2.endsWith(clusters[0]))
   }
 
+  @Test
+  public void testCreateAndDeleteDefaultZKPath() throws Throwable {
+    MockSliderClient client = new MockSliderClient()
+
+    String path = client.createZookeeperNode("cl1", true)
+    ZKIntegration zki = client.getLastZKIntegration()
+
+    String zkPath = ZKIntegration.mkClusterPath(USERNAME, "cl1")
+    assert zkPath == "/services/slider/users/" + USERNAME + "/cl1", "zkPath must be as expected"
+    assert path == zkPath
+    assert zki == null, "ZKIntegration should be null."
+    zki = createZKIntegrationInstance(getZKBinding(), "cl1", true, false, 5000);
+    assert false == zki.exists(zkPath), "zkPath should not exist"
+
+    path = client.createZookeeperNode("cl1", false)
+    zki = client.getLastZKIntegration()
+    assert zkPath == "/services/slider/users/" + USERNAME + "/cl1", "zkPath must be as expected"
+    assert path == zkPath
+    assert true == zki.exists(zkPath), "zkPath must exist"
+    zki.createPath(zkPath, "/cn", ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+    assert true == zki.exists(zkPath + "/cn"), "zkPath with child node must exist"
+    client.deleteZookeeperNode("cl1")
+    assert false == zki.exists(zkPath), "zkPath must not exist"
+
+  }
+
   public String createEphemeralChild(ZKIntegration zki, String userPath) {
     return zki.createPath(userPath, "/cluster-",
                           ZooDefs.Ids.OPEN_ACL_UNSAFE,
                           CreateMode.EPHEMERAL_SEQUENTIAL)
   }
 
+  class MockSliderClient extends SliderClient {
+    private ZKIntegration zki;
+
+    @Override
+    public String getUsername() {
+      return USERNAME
+    }
+
+    @Override
+    protected ZKIntegration getZkClient(String clusterName, String user) {
+      zki = createZKIntegrationInstance(getZKBinding(), "cl1", true, false, 5000)
+      return zki;
+    }
+
+    @Override
+    public synchronized Configuration getConfig() {
+      Configuration conf = new Configuration();
+      return conf;
+    }
+
+    public ZKIntegration getLastZKIntegration() {
+      return zki
+    }
+
+  }
+
 }