You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/07/11 06:17:55 UTC
[15/50] [abbrv] git commit: SLIDER-134. Provide a default ZK node for
apps
SLIDER-134. Provide a default ZK node for apps
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/8f933709
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/8f933709
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/8f933709
Branch: refs/heads/master
Commit: 8f933709d42034a541609874aaf3423f4b8904a8
Parents: a613e20
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Sun Jul 6 22:25:15 2014 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Mon Jul 7 07:49:32 2014 -0700
----------------------------------------------------------------------
app-packages/hbase/appConfig.json | 1 +
.../org/apache/slider/client/SliderClient.java | 112 ++++++++++++++++++-
.../slider/core/zk/BlockingZKWatcher.java | 8 +-
.../apache/slider/core/zk/ZKIntegration.java | 23 +++-
.../slider/providers/agent/AgentKeys.java | 1 +
.../providers/agent/AgentProviderService.java | 1 +
.../common/tools/TestZKIntegration.groovy | 53 +++++++++
7 files changed, 191 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/app-packages/hbase/appConfig.json
----------------------------------------------------------------------
diff --git a/app-packages/hbase/appConfig.json b/app-packages/hbase/appConfig.json
index 9a43863..81fcf2c 100644
--- a/app-packages/hbase/appConfig.json
+++ b/app-packages/hbase/appConfig.json
@@ -5,6 +5,7 @@
"global": {
"agent.conf": "/slider/agent/conf/agent.ini",
"application.def": "/slider/${app.package.name}.zip",
+ "create.default.zookeeper.node": "true",
"config_types": "core-site,hdfs-site,hbase-site",
"java_home": "/usr/jdk64/jdk1.7.0_45",
"package_list": "files/hbase-${hbase.version}-bin.tar.gz",
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index c35a60a..286ebac 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -94,6 +94,8 @@ import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
+import org.apache.slider.core.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.ZKIntegration;
import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
@@ -106,6 +108,10 @@ import org.apache.slider.server.services.registry.SliderRegistryService;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
import static org.apache.slider.common.params.SliderActions.*;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,6 +274,84 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return exitCode;
}
+ /**
+ * Delete the zookeeper node associated with the calling user and the cluster
+ **/
+ protected boolean deleteZookeeperNode(String clusterName) throws YarnException, IOException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ try {
+ Configuration config = getConfig();
+ if (!SliderUtils.isHadoopClusterSecure(config)) {
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ if (client.exists(zkPath)) {
+ log.info("Deleting zookeeper path {}", zkPath);
+ }
+ client.deleteRecursive(zkPath);
+ return true;
+ }
+ } else {
+ log.warn("Default zookeeper node is not available for secure cluster");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Unable to recursively delete zk node {}", zkPath, e);
+ } catch (KeeperException e) {
+ log.warn("Unable to recursively delete zk node {}", zkPath, e);
+ } catch (BadConfigException e) {
+ log.warn("Unable to recursively delete zk node {}", zkPath, e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Create the zookeeper node associated with the calling user and the cluster
+ */
+ protected String createZookeeperNode(String clusterName, Boolean nameOnly) throws YarnException, IOException {
+ String user = getUsername();
+ String zkPath = ZKIntegration.mkClusterPath(user, clusterName);
+ if(nameOnly) {
+ return zkPath;
+ }
+ Configuration config = getConfig();
+ if (!SliderUtils.isHadoopClusterSecure(config)) {
+ ZKIntegration client = getZkClient(clusterName, user);
+ if (client != null) {
+ try {
+ client.createPath(zkPath, "", ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ return zkPath;
+ } catch (InterruptedException e) {
+ log.warn("Unable to create zk node {}", zkPath, e);
+ } catch (KeeperException e) {
+ log.warn("Unable to create zk node {}", zkPath, e);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets a zookeeper client, returns null if it cannot connect to zookeeper
+ **/
+ protected ZKIntegration getZkClient(String clusterName, String user) throws YarnException {
+ String registryQuorum = lookupZKQuorum();
+ ZKIntegration client = null;
+ try {
+ BlockingZKWatcher watcher = new BlockingZKWatcher();
+ client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher);
+ client.init();
+ watcher.waitForZKConnection(2 * 1000);
+ } catch (InterruptedException e) {
+ client = null;
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ } catch (IOException e) {
+ log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+ }
+ return client;
+ }
/**
* Destroy a cluster. There's two race conditions here
@@ -297,6 +381,10 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
log.warn("Filesystem returned false from delete() operation");
}
+ if(!deleteZookeeperNode(clustername)) {
+ log.warn("Unable to perform node cleanup in Zookeeper.");
+ }
+
List<ApplicationReport> instances = findAllLiveInstances(clustername);
// detect any race leading to cluster creation during the check/destroy process
// and report a problem.
@@ -382,7 +470,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
*/
public void buildInstanceDefinition(String clustername,
- AbstractClusterBuildingActionArgs buildInfo)
+ AbstractClusterBuildingActionArgs buildInfo)
throws YarnException, IOException {
// verify that a live cluster isn't there
SliderUtils.validateClusterName(clustername);
@@ -498,11 +586,25 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
registryQuorum,
quorum);
String zookeeperRoot = buildInfo.getAppZKPath();
-
+
if (isSet(zookeeperRoot)) {
zkPaths.setAppPath(zookeeperRoot);
-
+ } else {
+ String createDefaultZkNode = appConf.getGlobalOptions().getOption(AgentKeys.CREATE_DEF_ZK_NODE, "false");
+ if (createDefaultZkNode.equals("true")) {
+ String defaultZKPath = createZookeeperNode(clustername, false);
+ log.info("ZK node created for application instance: {}.", defaultZKPath);
+ if (defaultZKPath != null) {
+ zkPaths.setAppPath(defaultZKPath);
+ }
+ } else {
+ // create AppPath if default is being used
+ String defaultZKPath = createZookeeperNode(clustername, true);
+ log.info("ZK node assigned to application instance: {}.", defaultZKPath);
+ zkPaths.setAppPath(defaultZKPath);
+ }
}
+
builder.addZKBinding(zkPaths);
//then propagate any package URI
@@ -646,8 +748,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
*/
public LaunchedApplication launchApplication(String clustername,
Path clusterDirectory,
- AggregateConf instanceDefinition,
- boolean debugAM)
+ AggregateConf instanceDefinition,
+ boolean debugAM)
throws YarnException, IOException {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
index 62ebff3..ca49888 100644
--- a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
public class BlockingZKWatcher implements Watcher {
@@ -49,7 +50,8 @@ public class BlockingZKWatcher implements Watcher {
* @param timeout timeout in millis
*/
- public void waitForZKConnection(int timeout) throws InterruptedException {
+ public void waitForZKConnection(int timeout)
+ throws InterruptedException, ConnectException {
synchronized (connectedFlag) {
if (!connectedFlag.get()) {
log.info("waiting for ZK event");
@@ -57,7 +59,9 @@ public class BlockingZKWatcher implements Watcher {
connectedFlag.wait(timeout);
}
}
- assert connectedFlag.get();
+ if (!connectedFlag.get()) {
+ throw new ConnectException("Unable to connect to ZK quorum");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
index 6270123..54aeb4f 100644
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -250,13 +250,34 @@ public class ZKIntegration implements Watcher {
KeeperException {
try {
zookeeper.delete(path, -1);
+ log.debug("Deleting {}", path);
return true;
} catch (KeeperException.NoNodeException ignored) {
return false;
}
}
-/**
+ /**
+ * Recursively delete a node, does not throw exception if any node does not exist.
+ * @param path
+ * @return true if delete was successful
+ */
+ public boolean deleteRecursive(String path) throws KeeperException, InterruptedException {
+
+ try {
+ List<String> children = zookeeper.getChildren(path, false);
+ for (String child : children) {
+ deleteRecursive(path + "/" + child);
+ }
+ delete(path);
+ } catch (KeeperException.NoNodeException ignored) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Build the path to a cluster; exists once the cluster has come up.
* Even before that, a ZK watcher could wait for it.
* @param username user
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 333058d..019a8e6 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -88,6 +88,7 @@ public interface AgentKeys {
String PACKAGE_LIST = "package_list";
String WAIT_HEARTBEAT = "wait.heartbeat";
String PYTHON_EXE = "python";
+ String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node";
String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval";
String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data";
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 85aa8db..6cd3d9e 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -898,6 +898,7 @@ public class AgentProviderService extends AbstractProviderService implements
tokens.put("${NN_URI}", nnuri);
tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
+ tokens.put("${DEF_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
tokens.put("${DEFAULT_DATA_DIR}", getAmState()
.getInternalsSnapshot()
.getGlobalOptions()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f933709/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
index 3930864..fe3bef7 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
@@ -20,6 +20,7 @@ package org.apache.slider.common.tools
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
+import org.apache.slider.client.SliderClient
import org.apache.slider.core.zk.ZKIntegration
import org.apache.slider.test.KeysForTests
import org.apache.slider.test.YarnZKMiniClusterTestBase
@@ -88,10 +89,62 @@ class TestZKIntegration extends YarnZKMiniClusterTestBase implements KeysForTest
(c1.endsWith(clusters[1]) && c2.endsWith(clusters[0]))
}
+ @Test
+ public void testCreateAndDeleteDefaultZKPath() throws Throwable {
+ MockSliderClient client = new MockSliderClient()
+
+ String path = client.createZookeeperNode("cl1", true)
+ ZKIntegration zki = client.getLastZKIntegration()
+
+ String zkPath = ZKIntegration.mkClusterPath(USERNAME, "cl1")
+ assert zkPath == "/services/slider/users/" + USERNAME + "/cl1", "zkPath must be as expected"
+ assert path == zkPath
+ assert zki == null, "ZKIntegration should be null."
+ zki = createZKIntegrationInstance(getZKBinding(), "cl1", true, false, 5000);
+ assert false == zki.exists(zkPath), "zkPath should not exist"
+
+ path = client.createZookeeperNode("cl1", false)
+ zki = client.getLastZKIntegration()
+ assert zkPath == "/services/slider/users/" + USERNAME + "/cl1", "zkPath must be as expected"
+ assert path == zkPath
+ assert true == zki.exists(zkPath), "zkPath must exist"
+ zki.createPath(zkPath, "/cn", ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+ assert true == zki.exists(zkPath + "/cn"), "zkPath with child node must exist"
+ client.deleteZookeeperNode("cl1")
+ assert false == zki.exists(zkPath), "zkPath must not exist"
+
+ }
+
public String createEphemeralChild(ZKIntegration zki, String userPath) {
return zki.createPath(userPath, "/cluster-",
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL)
}
+ class MockSliderClient extends SliderClient {
+ private ZKIntegration zki;
+
+ @Override
+ public String getUsername() {
+ return USERNAME
+ }
+
+ @Override
+ protected ZKIntegration getZkClient(String clusterName, String user) {
+ zki = createZKIntegrationInstance(getZKBinding(), "cl1", true, false, 5000)
+ return zki;
+ }
+
+ @Override
+ public synchronized Configuration getConfig() {
+ Configuration conf = new Configuration();
+ return conf;
+ }
+
+ public ZKIntegration getLastZKIntegration() {
+ return zki
+ }
+
+ }
+
}