You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/12/21 10:52:20 UTC
[hbase] branch branch-1 updated: HBASE-24620 : Add a ClusterManager
which submits command to ZooKeeper and its Agent which picks and execute
those Commands (#2299)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new e250c7f HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands (#2299)
e250c7f is described below
commit e250c7fc63308820f1dd5a3e266825ec09e7f204
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Mon Dec 21 15:33:36 2020 +0530
HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands (#2299)
Co-authored-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Aman Poonia <ap...@salesforce.com>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
bin/chaos-daemon.sh | 140 +++++
...ent_Which_Submits_Command_Through_ZooKeeper.pdf | Bin 0 -> 270679 bytes
.../org/apache/hadoop/hbase/chaos/ChaosAgent.java | 597 +++++++++++++++++++++
.../apache/hadoop/hbase/chaos/ChaosConstants.java | 77 +++
.../apache/hadoop/hbase/chaos/ChaosService.java | 138 +++++
.../org/apache/hadoop/hbase/chaos/ChaosUtils.java | 49 ++
.../org/apache/hadoop/hbase/ChaosZKClient.java | 339 ++++++++++++
.../apache/hadoop/hbase/ZNodeClusterManager.java | 123 +++++
8 files changed, 1463 insertions(+)
diff --git a/bin/chaos-daemon.sh b/bin/chaos-daemon.sh
new file mode 100644
index 0000000..084e519
--- /dev/null
+++ b/bin/chaos-daemon.sh
@@ -0,0 +1,140 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+
+usage="Usage: chaos-daemon.sh (start|stop) chaosagent"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+ echo "$usage"
+ exit 1
+fi
+
+# get arguments
+startStop=$1
+shift
+
+command=$1
+shift
+
+check_before_start(){
+ #ckeck if the process is not running
+ mkdir -p "$HBASE_PID_DIR"
+ if [ -f "$CHAOS_PID" ]; then
+ if kill -0 "$(cat "$CHAOS_PID")" > /dev/null 2>&1; then
+ echo "$command" running as process "$(cat "$CHAOS_PID")". Stop it first.
+ exit 1
+ fi
+ fi
+}
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=$(cd "$bin">/dev/null || exit; pwd)
+
+. "$bin"/hbase-config.sh
+. "$bin"/hbase-common.sh
+
+CLASSPATH=$HBASE_CONF_DIR
+for f in ../lib/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f
+done
+
+# get log directory
+if [ "$HBASE_LOG_DIR" = "" ]; then
+ export HBASE_LOG_DIR="$HBASE_HOME/logs"
+fi
+
+if [ "$HBASE_PID_DIR" = "" ]; then
+ HBASE_PID_DIR=/tmp
+fi
+
+if [ "$HBASE_IDENT_STRING" = "" ]; then
+ export HBASE_IDENT_STRING="$USER"
+fi
+
+if [ "$JAVA_HOME" != "" ]; then
+ #echo "run java in $JAVA_HOME"
+ JAVA_HOME=$JAVA_HOME
+fi
+if [ "$JAVA_HOME" = "" ]; then
+ echo "Error: JAVA_HOME is not set."
+ exit 1
+fi
+
+export HBASE_LOG_PREFIX=hbase-$HBASE_IDENT_STRING-$command-$HOSTNAME
+export CHAOS_LOGFILE=$HBASE_LOG_PREFIX.log
+
+if [ -z "${HBASE_ROOT_LOGGER}" ]; then
+export HBASE_ROOT_LOGGER=${HBASE_ROOT_LOGGER:-"INFO,RFA"}
+fi
+
+if [ -z "${HBASE_SECURITY_LOGGER}" ]; then
+export HBASE_SECURITY_LOGGER=${HBASE_SECURITY_LOGGER:-"INFO,RFAS"}
+fi
+
+CHAOS_LOGLOG=${CHAOS_LOGLOG:-"${HBASE_LOG_DIR}/${CHAOS_LOGFILE}"}
+CHAOS_PID=$HBASE_PID_DIR/hbase-$HBASE_IDENT_STRING-$command.pid
+
+if [ -z "$CHAOS_JAVA_OPTS" ]; then
+ CHAOS_JAVA_OPTS="-Xms1024m -Xmx4096m"
+fi
+
+case $startStop in
+
+(start)
+ check_before_start
+ echo running $command
+ CMD="${JAVA_HOME}/bin/java -Dapp.home=${HBASE_CONF_DIR}/../ ${CHAOS_JAVA_OPTS} -cp ${CLASSPATH} org.apache.hadoop.hbase.chaos.ChaosService -$command start &>> ${CHAOS_LOGLOG} &"
+
+ eval $CMD
+ PID=$(echo $!)
+ echo ${PID} >${CHAOS_PID}
+
+ echo "Chaos ${1} process Started with ${PID} !"
+ now=$(date)
+ echo "${now} Chaos ${1} process Started with ${PID} !" >>${CHAOS_LOGLOG}
+ ;;
+
+(stop)
+ echo stopping $command
+ if [ -f $CHAOS_PID ]; then
+ pidToKill=`cat $CHAOS_PID`
+ # kill -0 == see if the PID exists
+ if kill -0 $pidToKill > /dev/null 2>&1; then
+ echo -n stopping $command
+ echo "`date` Terminating $command" >> $CHAOS_LOGLOG
+ kill $pidToKill > /dev/null 2>&1
+ waitForProcessEnd $pidToKill $command
+ else
+ retval=$?
+ echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval
+ fi
+ else
+ echo no $command to stop because no pid file $CHAOS_PID
+ fi
+ rm -f $CHAOS_PID
+ ;;
+
+(*)
+ echo $usage
+ exit 1
+ ;;
+
+esac
diff --git a/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf
new file mode 100644
index 0000000..fe35c04
Binary files /dev/null and b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf differ
diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
new file mode 100644
index 0000000..573e757
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * An agent for executing destructive actions for ChaosMonkey.
+ * Uses ZooKeeper Watchers and LocalShell, to do the killing
+ * and getting status of service on targeted host without SSH.
+ * uses given ZNode Structure:
+ * /perfChaosTest (root)
+ * |
+ * |
+ * /chaosAgents (Used for registration has
+ * hostname ephemeral nodes as children)
+ * |
+ * |
+ * /chaosAgentTaskStatus (Used for task
+ * Execution, has hostname persistent
+ * nodes as child with tasks as their children)
+ * |
+ * |
+ * /hostname
+ * |
+ * |
+ * /task0000001 (command as data)
+ * (has two types of command :
+ * 1: starts with "exec"
+ * for executing a destructive action.
+ * 2: starts with "bool" for getting
+ * only status of service.
+ *
+ */
+@InterfaceAudience.Private
+public class ChaosAgent implements Watcher, Closeable, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class);
+ static AtomicBoolean stopChaosAgent = new AtomicBoolean();
+ private ZooKeeper zk;
+ private String quorum;
+ private String agentName;
+ private Configuration conf;
+ private RetryCounterFactory retryCounterFactory;
+ private volatile boolean connected = false;
+
+ public ChaosAgent(Configuration conf, String quorum, String agentName) {
+ initChaosAgent(conf, quorum, agentName);
+ }
+
+ /***
+ * sets global params and initiates connection with ZooKeeper then does registration.
+ * @param conf initial configuration to use
+ * @param quorum ZK Quorum
+ * @param agentName AgentName to use
+ */
+ private void initChaosAgent(Configuration conf, String quorum, String agentName) {
+ this.conf = conf;
+ this.quorum = quorum;
+ this.agentName = agentName;
+ this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
+ .setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY,
+ ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval(
+ conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
+ ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
+ try {
+ this.createZKConnection(null);
+ this.register();
+ } catch (IOException e) {
+ LOG.error("Error Creating Connection: " + e);
+ }
+ }
+
+ /***
+ * Creates Connection with ZooKeeper.
+ * @throws IOException if something goes wrong
+ */
+ private void createZKConnection(Watcher watcher) throws IOException {
+ if(watcher == null) {
+ zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
+ } else {
+ zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
+ }
+ LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
+ }
+
+ //WATCHERS: Below are the Watches used by ChaosAgent
+
+ /***
+ * Watcher for notifying if any task is assigned to agent or not,
+ * by seeking if any Node is being added to agent as Child.
+ */
+ Watcher newTaskCreatedWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
+ if (!(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+ ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath())) {
+ throw new RuntimeException(KeeperException.create(
+ KeeperException.Code.DATAINCONSISTENCY));
+ }
+
+ LOG.info("Change in Tasks Node, checking for Tasks again.");
+ getTasks();
+ }
+
+ }
+ };
+
+ //CALLBACKS: Below are the Callbacks used by Chaos Agent
+
+ /**
+ * Callback used while setting status of a given task, Logs given status.
+ */
+ AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = new AsyncCallback.StatCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ // Connection to the server was lost while setting status setting again.
+ try {
+ ChaosAgent.this.recreateZKConnection();
+ } catch (Exception e) {
+ break;
+ }
+ ChaosAgent.this.setStatusOfTaskZNode(path, (String) ctx);
+ break;
+
+ case OK:
+ LOG.info("Status of Task has been set");
+ break;
+
+ case NONODE:
+ LOG.error("Chaos Agent status node does not exists: "
+ + "check for ZNode directory structure again.");
+ break;
+
+ default:
+ LOG.error("Error while setting status of task ZNode: " + path,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ }
+ };
+
+ /**
+ * Callback used while creating a Persistent ZNode tries to create
+ * ZNode again if Connection was lost in previous try.
+ */
+ AsyncCallback.StringCallback createZNodeCallback = new AsyncCallback.StringCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, String name) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ try {
+ ChaosAgent.this.recreateZKConnection();
+ } catch (Exception e) {
+ break;
+ }
+ ChaosAgent.this.createZNode(path, (byte[]) ctx);
+ break;
+ case OK:
+ LOG.info("ZNode created : " + path);
+ break;
+ case NODEEXISTS:
+ LOG.warn("ZNode already registered: " + path);
+ break;
+ default:
+ LOG.error("Error occurred while creating Persistent ZNode: " + path,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ }
+ };
+
+ /**
+ * Callback used while creating a Ephemeral ZNode tries to create ZNode again
+ * if Connection was lost in previous try.
+ */
+ AsyncCallback.StringCallback createEphemeralZNodeCallback = new AsyncCallback.StringCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, String name) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ try {
+ ChaosAgent.this.recreateZKConnection();
+ } catch (Exception e) {
+ break;
+ }
+ ChaosAgent.this.createEphemeralZNode(path, (byte[]) ctx);
+ break;
+ case OK:
+ LOG.info("ZNode created : " + path);
+ break;
+ case NODEEXISTS:
+ LOG.warn("ZNode already registered: " + path);
+ break;
+ default:
+ LOG.error("Error occurred while creating Ephemeral ZNode: ",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ }
+ };
+
+ /**
+ * Callback used by getTasksForAgentCallback while getting command,
+ * after getting command successfully, it executes command and
+ * set its status with respect to the command type.
+ */
+ AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ //Connection to the server has been lost while getting task, getting data again.
+ try {
+ recreateZKConnection();
+ } catch (Exception e) {
+ break;
+ }
+ zk.getData(path,
+ false,
+ getTaskForExecutionCallback,
+ new String(data));
+ break;
+ case OK:
+ String cmd = new String(data);
+ LOG.info("Executing command : " + cmd);
+ String status = ChaosConstants.TASK_COMPLETION_STRING;
+ try {
+ String user = conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER,
+ ChaosConstants.DEFAULT_SHELL_USER);
+ switch (cmd.substring(0, 4)) {
+ case "bool":
+ String ret = execWithRetries(user, cmd.substring(4)).getSecond();
+ status = Boolean.toString(ret.length() > 0);
+ break;
+
+ case "exec":
+ execWithRetries(user, cmd.substring(4));
+ break;
+
+ default:
+ LOG.error("Unknown Command Type");
+ status = ChaosConstants.TASK_ERROR_STRING;
+ }
+ } catch (IOException e) {
+ LOG.error("Got error while executing command : " + cmd +
+ " On agent : " + agentName + " Error : " + e);
+ status = ChaosConstants.TASK_ERROR_STRING;
+ }
+
+ try {
+ setStatusOfTaskZNode(path, status);
+ Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
+ } catch (InterruptedException e) {
+ LOG.error("Error occured after setting status: " + e);
+ }
+
+ default:
+ LOG.error("Error occurred while getting data",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ }
+ };
+
+ /***
+ * Callback used while getting Tasks for agent if call executed without Exception,
+ * It creates a separate thread for each children to execute given Tasks parallely.
+ */
+ AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String> children) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS: {
+ // Connection to the server has been lost, getting tasks again.
+ try {
+ recreateZKConnection();
+ } catch (Exception e) {
+ break;
+ }
+ getTasks();
+ break;
+ }
+
+ case OK: {
+ if (children != null) {
+ try {
+
+ LOG.info("Executing each task as a separate thread");
+ List<Thread> tasksList = new ArrayList<>();
+ for (final String task : children) {
+ String threadName = agentName + "_" + task;
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Executing task : " + task + " of agent : " + agentName);
+ zk.getData(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE
+ + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName
+ + ChaosConstants.ZNODE_PATH_SEPARATOR + task, false,
+ getTaskForExecutionCallback, task);
+
+ }
+ });
+ t.setName(threadName);
+ t.start();
+ tasksList.add(t);
+
+ for (Thread thread : tasksList) {
+ thread.join();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error scheduling next task : " +
+ " for agent : " + agentName + " Error : " + e);
+ }
+ }
+ break;
+ }
+
+ default:
+ LOG.error("Error occurred while getting task",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ }
+ };
+
+ /***
+ * Function to create PERSISTENT ZNODE with given path and data given as params
+ * @param path Path at which ZNode to create
+ * @param data Data to put under ZNode
+ */
+ public void createZNode(String path, byte[] data) {
+ zk.create(path,
+ data,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ createZNodeCallback,
+ data);
+ }
+
+ /***
+ * Function to create EPHEMERAL ZNODE with given path and data as params.
+ * @param path Path at which Ephemeral ZNode to create
+ * @param data Data to put under ZNode
+ */
+ public void createEphemeralZNode(String path, byte[] data) {
+ zk.create(path,
+ data,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ createEphemeralZNodeCallback,
+ data);
+ }
+
+ /**
+ * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same.
+ *
+ * @param path Path to check for ZNode
+ */
+ private void createIfZNodeNotExists(String path) {
+ try {
+ if (zk.exists(path,
+ false) == null) {
+ createZNode(path, new byte[0]);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ LOG.error("Error checking given node : " + path + " " + e);
+ }
+ }
+
+ /**
+ * sets given Status for Task Znode
+ *
+ * @param taskZNode ZNode to set status
+ * @param status Status value
+ */
+ public void setStatusOfTaskZNode(String taskZNode, String status) {
+ LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status);
+ zk.setData(taskZNode,
+ status.getBytes(),
+ -1,
+ setStatusOfTaskZNodeCallback,
+ null);
+ }
+
+ /**
+ * registration of ChaosAgent by checking and creating necessary ZNodes.
+ */
+ private void register() {
+ createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE);
+ createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE);
+ createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE);
+ createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+ ChaosConstants.ZNODE_PATH_SEPARATOR + agentName);
+
+ createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
+ ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
+ }
+
+ /***
+ * Gets tasks for execution, basically sets Watch on it's respective host's Znode and
+ * waits for tasks to be assigned, also has a getTasksForAgentCallback
+ * which handles execution of task.
+ */
+ private void getTasks() {
+ LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks");
+ zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+ ChaosConstants.ZNODE_PATH_SEPARATOR + agentName,
+ newTaskCreatedWatcher,
+ getTasksForAgentCallback,
+ null);
+ }
+
+ /**
+ * Below function executes command with retries with given user.
+ * Uses LocalShell to execute a command.
+ *
+ * @param user user name, default none
+ * @param cmd Command to execute
+ * @return A pair of Exit Code and Shell output
+ * @throws IOException Exception while executing shell command
+ */
+ private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return exec(user, cmd);
+ } catch (IOException e) {
+ retryOrThrow(retryCounter, e, user, cmd);
+ }
+ try {
+ retryCounter.sleepUntilNextRetry();
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep Interrupted: " + e);
+ }
+ }
+ }
+
+ private Pair<Integer, String> exec(String user, String cmd) throws IOException {
+ LOG.info("Executing Shell command: " + cmd + " , user: " + user);
+
+ LocalShell shell = new LocalShell(user, cmd);
+ try {
+ shell.execute();
+ } catch (Shell.ExitCodeException e) {
+ String output = shell.getOutput();
+ throw new Shell.ExitCodeException(e.getExitCode(), "stderr: " + e.getMessage()
+ + ", stdout: " + output);
+ }
+ LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(), shell.getOutput());
+
+ return new Pair<>(shell.getExitCode(), shell.getOutput());
+ }
+
+ private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
+ String user, String cmd) throws E {
+ if (retryCounter.shouldRetry()) {
+ LOG.warn("Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}."
+ + "Exception {}", cmd, user,retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(),
+ ex.getMessage());
+ return;
+ }
+ throw ex;
+ }
+
+ private boolean isConnected() {
+ return connected;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName);
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error while closing ZooKeeper Connection.");
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("Running Chaos Agent on : " + agentName);
+ while (!this.isConnected()) {
+ Thread.sleep(100);
+ }
+ this.getTasks();
+ while (!stopChaosAgent.get()) {
+ Thread.sleep(500);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error while running Chaos Agent", e);
+ }
+
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info("Processing event: " + watchedEvent.toString());
+ if (watchedEvent.getType() == Event.EventType.None) {
+ switch (watchedEvent.getState()) {
+ case SyncConnected:
+ connected = true;
+ break;
+ case Disconnected:
+ connected = false;
+ break;
+ case Expired:
+ connected = false;
+ LOG.error("Session expired creating again");
+ try {
+ createZKConnection(null);
+ } catch (IOException e) {
+ LOG.error("Error creating Zookeeper connection", e);
+ }
+ default:
+ LOG.error("Unknown State");
+ break;
+ }
+ }
+ }
+
+ private void recreateZKConnection() throws Exception{
+ try {
+ zk.close();
+ createZKConnection(newTaskCreatedWatcher);
+ createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
+ ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
+ } catch (IOException e) {
+ LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e);
+ throw e;
+ }
+ }
+
+ /**
+ * Executes Command locally.
+ */
+ protected static class LocalShell extends Shell.ShellCommandExecutor {
+
+ private String user;
+ private String execCommand;
+
+ public LocalShell(String user, String execCommand) {
+ super(new String[]{execCommand});
+ this.user = user;
+ this.execCommand = execCommand;
+ }
+
+ @Override
+ public String[] getExecString() {
+ // TODO: Considering Agent is running with same user.
+ if(!user.equals(ChaosConstants.DEFAULT_SHELL_USER)){
+ execCommand = String.format("su -u %1$s %2$s", user, execCommand);
+ }
+ return new String[]{"/usr/bin/env", "bash", "-c", execCommand};
+ }
+
+ @Override
+ public void execute() throws IOException {
+ super.execute();
+ }
+ }
+}
diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java
new file mode 100644
index 0000000..18d9d06
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/***
+ * ChaosConstant holds a bunch of Choas-related Constants
+ */
+@InterfaceAudience.Public
+public final class ChaosConstants {
+
+ /*Base ZNode for whole Chaos Testing*/
+ public static final String CHAOS_TEST_ROOT_ZNODE = "/hbase";
+
+ /*Just a / used for path separator*/
+ public static final String ZNODE_PATH_SEPARATOR = "/";
+
+ /*ZNode used for ChaosAgents registration.*/
+ public static final String CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE =
+ CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgents";
+
+ /*ZNode used for getting status of tasks assigned*/
+ public static final String CHAOS_AGENT_STATUS_PERSISTENT_ZNODE =
+ CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgentTaskStatus";
+
+ /*Config property for getting number of retries to execute a command*/
+ public static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
+
+ /*Default value for number of retries*/
+ public static final int DEFAULT_RETRY_ATTEMPTS = 5;
+
+ /*Config property to sleep in between retries*/
+ public static final String RETRY_SLEEP_INTERVAL_KEY =
+ "hbase.it.clustermanager.retry.sleep.interval";
+
+ /*Default Sleep time between each retry*/
+ public static final int DEFAULT_RETRY_SLEEP_INTERVAL = 5000;
+
+ /*Config property for executing command as specific user*/
+ public static final String CHAOSAGENT_SHELL_USER = "hbase.it.clustermanager.ssh.user";
+
+ /*default user for executing local commands*/
+ public static final String DEFAULT_SHELL_USER = "";
+
+ /*timeout used while creating ZooKeeper connection*/
+ public static final int SESSION_TIMEOUT_ZK = 60000 * 10;
+
+ /*Time given to ChaosAgent to set status*/
+ public static final int SET_STATUS_SLEEP_TIME = 30 * 1000;
+
+ /*Status String when you get an ERROR while executing task*/
+ public static final String TASK_ERROR_STRING = "error";
+
+ /*Status String when your command gets executed correctly*/
+ public static final String TASK_COMPLETION_STRING = "done";
+
+ /*Name of ChoreService to use*/
+ public static final String CHORE_SERVICE_PREFIX = "ChaosService";
+
+}
diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java
new file mode 100644
index 0000000..2038444
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class used to start/stop Chaos related services (currently chaosagent)
+ */
+@InterfaceAudience.Private
+public class ChaosService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ChaosService.class.getName());
+
+ public static void execute(String[] args, Configuration conf) {
+ LOG.info("arguments : " + Arrays.toString(args));
+
+ try {
+ CommandLine cmdline = new GnuParser().parse(getOptions(), args);
+ if (cmdline.hasOption(ChaosServiceName.CHAOSAGENT.toString().toLowerCase())) {
+ String actionStr = cmdline.getOptionValue(ChaosServiceName.CHAOSAGENT.toString().toLowerCase());
+ try {
+ ExecutorAction action = ExecutorAction.valueOf(actionStr.toUpperCase());
+ if (action == ExecutorAction.START) {
+ ChaosServiceStart(conf, ChaosServiceName.CHAOSAGENT);
+ } else if (action == ExecutorAction.STOP) {
+ ChaosServiceStop();
+ }
+ } catch (IllegalArgumentException e) {
+ LOG.error("action passed: {} Unexpected action. Please provide only start/stop.",
+ actionStr, e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.error("Invalid Options");
+ }
+ } catch (Exception e) {
+ LOG.error("Error while starting ChaosService : ", e);
+ }
+ }
+
+ private static void ChaosServiceStart(Configuration conf, ChaosServiceName serviceName) {
+ switch (serviceName) {
+ case CHAOSAGENT:
+ ChaosAgent.stopChaosAgent.set(false);
+ try {
+ Thread t = new Thread(new ChaosAgent(conf,
+ ChaosUtils.getZKQuorum(conf), ChaosUtils.getHostName()));
+ t.start();
+ t.join();
+ } catch (InterruptedException | UnknownHostException e) {
+ LOG.error("Failed while executing next task execution of ChaosAgent on : {}",
+ serviceName, e);
+ }
+ break;
+ default:
+ LOG.error("Service Name not known : " + serviceName.toString());
+ }
+ }
+
+ private static void ChaosServiceStop() {
+ ChaosAgent.stopChaosAgent.set(true);
+ }
+
+ private static Options getOptions() {
+ Options options = new Options();
+ options.addOption(new Option("c", ChaosServiceName.CHAOSAGENT.toString().toLowerCase(),
+ true, "expecting a start/stop argument"));
+ options.addOption(new Option("D", ChaosServiceName.GENERIC.toString(),
+ true, "generic D param"));
+ LOG.info(Arrays.toString(new Collection[] { options.getOptions() }));
+ return options;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ new GenericOptionsParser(conf, args);
+
+ ChoreService choreChaosService = null;
+ ScheduledChore authChore = AuthUtil.getAuthChore(conf);
+
+ try {
+ if (authChore != null) {
+ choreChaosService = new ChoreService(ChaosConstants.CHORE_SERVICE_PREFIX);
+ choreChaosService.scheduleChore(authChore);
+ }
+
+ execute(args, conf);
+ } finally {
+ if (authChore != null)
+ choreChaosService.shutdown();
+ }
+ }
+
+ enum ChaosServiceName {
+ CHAOSAGENT,
+ GENERIC
+ }
+
+
+ enum ExecutorAction {
+ START,
+ STOP
+ }
+}
diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java
new file mode 100644
index 0000000..c84ed04
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * ChaosUtils holds a bunch of useful functions like getting hostname and getting ZooKeeper quorum.
+ */
+@InterfaceAudience.Private
+public class ChaosUtils {
+
+ public static String getHostName() throws UnknownHostException {
+ return InetAddress.getLocalHost().getHostName();
+ }
+
+
+ public static String getZKQuorum(Configuration conf) {
+ String port =
+ Integer.toString(conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
+ String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
+ for (int i = 0; i < serverHosts.length; i++) {
+ serverHosts[i] = serverHosts[i] + ":" + port;
+ }
+ return String.join(",", serverHosts);
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java
new file mode 100644
index 0000000..5be37cd
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ChaosZKClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName());
+ private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents";
+ private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus";
+ private static final String ZNODE_PATH_SEPARATOR = "/";
+ private static final String TASK_PREFIX = "task_";
+ private static final String TASK_ERROR_STRING = "error";
+ private static final String TASK_COMPLETION_STRING = "done";
+ private static final String TASK_BOOLEAN_TRUE = "true";
+ private static final String TASK_BOOLEAN_FALSE = "false";
+ private static final String CONNECTION_LOSS = "ConnectionLoss";
+ private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000;
+ private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000;
+ private volatile String taskStatus = null;
+
+ private final String quorum;
+ private ZooKeeper zk;
+
+ public ChaosZKClient(String quorum) {
+ this.quorum = quorum;
+ try {
+ this.createNewZKConnection();
+ } catch (IOException e) {
+ LOG.error("Error creating ZooKeeper Connection: ", e);
+ }
+ }
+
+ /**
+ * Creates connection with ZooKeeper
+ * @throws IOException when not able to create connection properly
+ */
+ public void createNewZKConnection() throws IOException {
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info("Created ZooKeeper Connection For executing task");
+ }
+ };
+
+ this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher);
+ }
+
+ /**
+ * Checks if ChaosAgent is running or not on target host by checking its ZNode.
+ * @param hostname hostname to check for chaosagent
+ * @return true/false whether agent is running or not
+ */
+ private boolean isChaosAgentRunning(String hostname) {
+ try {
+ return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname,
+ false) != null;
+ } catch (KeeperException e) {
+ if (e.toString().contains(CONNECTION_LOSS)) {
+ recreateZKConnection();
+ try {
+ return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname,
+ false) != null;
+ } catch (KeeperException | InterruptedException ie) {
+ LOG.error("ERROR ", ie);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e);
+ }
+ return false;
+ }
+
+ /**
+ * Creates tasks for target hosts by creating ZNodes.
+ * Waits for a limited amount of time to complete task to execute.
+ * @param taskObject Object data represents command
+ * @return returns status
+ */
+ public String submitTask(final TaskObject taskObject) {
+ if (isChaosAgentRunning(taskObject.getTaskHostname())) {
+ LOG.info("Creating task node");
+ zk.create(CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR +
+ taskObject.getTaskHostname() + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
+ taskObject.getCommand().getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL,
+ submitTaskCallback,
+ taskObject);
+ long start = System.currentTimeMillis();
+
+ while ((System.currentTimeMillis() - start) < TASK_EXECUTION_TIMEOUT) {
+ if(taskStatus != null) {
+ return taskStatus;
+ }
+ Threads.sleep(500);
+ }
+ } else {
+ LOG.info("EHHHHH! ChaosAgent Not running");
+ }
+ return TASK_ERROR_STRING;
+ }
+
+ /**
+ * To get status of task submitted
+ * @param path path at which to get status
+ * @param ctx path context
+ */
+ private void getStatus(String path , Object ctx) {
+ LOG.info("Getting Status of task: " + path);
+ zk.getData(path,
+ false,
+ getStatusCallback,
+ ctx);
+ }
+
+ /**
+ * Set a watch on task submitted
+ * @param name ZNode name to set a watch
+ * @param taskObject context for ZNode name
+ */
+ private void setStatusWatch(String name, TaskObject taskObject) {
+ LOG.info("Checking for ZNode and Setting watch for task : " + name);
+ zk.exists(name,
+ setStatusWatcher,
+ setStatusWatchCallback,
+ taskObject);
+ }
+
+ /**
+ * Delete task after getting its status
+ * @param path path to delete ZNode
+ */
+ private void deleteTask(String path) {
+ LOG.info("Deleting task: " + path);
+ zk.delete(path,
+ -1,
+ taskDeleteCallback,
+ null);
+ }
+
+ //WATCHERS:
+
+ /**
+ * Watcher to get notification whenever status of task changes.
+ */
+ Watcher setStatusWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info("Setting status watch for task: " + watchedEvent.getPath());
+ if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
+ if(!watchedEvent.getPath().contains(TASK_PREFIX)) {
+ throw new RuntimeException(KeeperException.create(
+ KeeperException.Code.DATAINCONSISTENCY));
+ }
+ getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath());
+
+ }
+ }
+ };
+
+ //CALLBACKS
+
+ AsyncCallback.DataCallback getStatusCallback = new AsyncCallback.DataCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ //Connectionloss while getting status of task, getting again
+ ChaosZKClient.this.recreateZKConnection();
+ ChaosZKClient.this.getStatus(path, ctx);
+ break;
+
+ case OK:
+ if (ctx != null) {
+
+ String status = new String(data);
+ taskStatus = status;
+ switch (status) {
+ case TASK_COMPLETION_STRING:
+ case TASK_BOOLEAN_TRUE:
+ case TASK_BOOLEAN_FALSE:
+ LOG.info("Task executed completely : Status --> " + status);
+ break;
+
+ case TASK_ERROR_STRING:
+ LOG.info("There was error while executing task : Status --> " + status);
+ break;
+
+ default:
+ LOG.warn("Status of task is undefined!! : Status --> " + status);
+ }
+
+ ChaosZKClient.this.deleteTask(path);
+ }
+ break;
+
+ default:
+ LOG.error("ERROR while getting status of task: " + path + " ERROR: " + KeeperException
+ .create(KeeperException.Code.get(rc)));
+ }
+ }
+ };
+
+ AsyncCallback.StatCallback setStatusWatchCallback = new AsyncCallback.StatCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ //ConnectionLoss while setting watch on status ZNode, setting again.
+ ChaosZKClient.this.recreateZKConnection();
+ ChaosZKClient.this.setStatusWatch(path, (TaskObject) ctx);
+ break;
+
+ case OK:
+ if (stat != null) {
+ ChaosZKClient.this.getStatus(path, null);
+ }
+ break;
+
+ default:
+ LOG.error(
+ "ERROR while setting watch on task ZNode: " + path + " ERROR: " + KeeperException
+ .create(KeeperException.Code.get(rc)));
+ }
+ }
+ };
+
+ AsyncCallback.StringCallback submitTaskCallback = new AsyncCallback.StringCallback() {
+ @Override public void processResult(int rc, String path, Object ctx, String name) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ // Connection to server was lost while submitting task, submitting again.
+ ChaosZKClient.this.recreateZKConnection();
+ ChaosZKClient.this.submitTask((TaskObject) ctx);
+ break;
+
+ case OK:
+ LOG.info("Task created : " + name);
+ ChaosZKClient.this.setStatusWatch(name, (TaskObject) ctx);
+ break;
+
+ default:
+ LOG.error("Error submitting task: " + name + " ERROR:" + KeeperException
+ .create(KeeperException.Code.get(rc)));
+ }
+ }
+ };
+
+ AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ switch (KeeperException.Code.get(rc)) {
+ case CONNECTIONLOSS:
+ //Connectionloss while deleting task, deleting again
+ recreateZKConnection();
+ deleteTask(path);
+ break;
+
+ case OK:
+ LOG.info("Task Deleted successfully!");
+ LOG.info("Closing ZooKeeper Connection");
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error while closing ZooKeeper Connection.");
+ }
+ break;
+
+ default:
+ LOG.error("ERROR while deleting task: " + path + " ERROR: " +
+ KeeperException.create(KeeperException.Code.get(rc)));
+ }
+ }
+ };
+
+
+ private void recreateZKConnection() {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error closing ZK connection : ", e);
+ } finally {
+ try {
+ createNewZKConnection();
+ } catch (IOException e) {
+ LOG.error("Error creating new ZK COnnection for agent: ", e);
+ }
+ }
+ }
+
+ static class TaskObject {
+ private final String command;
+ private final String taskHostname;
+
+ public TaskObject(String command, String taskHostname) {
+ this.command = command;
+ this.taskHostname = taskHostname;
+ }
+
+ public String getCommand() {
+ return this.command;
+ }
+
+ public String getTaskHostname() {
+ return taskHostname;
+ }
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java
new file mode 100644
index 0000000..998acb9
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ZNodeClusterManager extends Configured implements ClusterManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName());
+ private static final String SIGKILL = "SIGKILL";
+ private static final String SIGSTOP = "SIGSTOP";
+ private static final String SIGCONT = "SIGCONT";
+ public ZNodeClusterManager() {
+ }
+
+ private String getZKQuorumServersStringFromHbaseConfig() {
+ String port =
+ Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
+ String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < serverHosts.length; i++) {
+ serverHosts[i] = serverHosts[i] + ":" + port;
+ sb.append(serverHosts[i]);
+ if (i < serverHosts.length - 1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ private String createZNode(String hostname, String cmd) throws IOException{
+ LOG.info("Zookeeper Mode enabled sending command to zookeeper + " +
+ cmd + "hostname:" + hostname);
+ ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig());
+ return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname));
+ }
+
+ protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service)
+ throws IOException {
+ switch (service) {
+ case HADOOP_DATANODE:
+ case HADOOP_NAMENODE:
+ return new HBaseClusterManager.HadoopShellCommandProvider(getConf());
+ case ZOOKEEPER_SERVER:
+ return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf());
+ default:
+ return new HBaseClusterManager.HBaseShellCommandProvider(getConf());
+ }
+ }
+
+ public void signal(ServiceType service, String signal, String hostname) throws IOException {
+ createZNode(hostname, CmdType.exec.toString() +
+ getCommandProvider(service).signalCommand(service, signal));
+ }
+
+ private void createOpCommand(String hostname, ServiceType service,
+ HBaseClusterManager.CommandProvider.Operation op) throws IOException{
+ createZNode(hostname, CmdType.exec.toString() +
+ getCommandProvider(service).getCommand(service, op));
+ }
+
+ @Override
+ public void start(ServiceType service, String hostname, int port) throws IOException {
+ createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START);
+ }
+
+ @Override
+ public void stop(ServiceType service, String hostname, int port) throws IOException {
+ createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP);
+ }
+
+ @Override
+ public void restart(ServiceType service, String hostname, int port) throws IOException {
+ createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART);
+ }
+
+ @Override
+ public void kill(ServiceType service, String hostname, int port) throws IOException {
+ signal(service, SIGKILL, hostname);
+ }
+
+ @Override
+ public void suspend(ServiceType service, String hostname, int port) throws IOException {
+ signal(service, SIGSTOP, hostname);
+ }
+
+ @Override
+ public void resume(ServiceType service, String hostname, int port) throws IOException {
+ signal(service, SIGCONT, hostname);
+ }
+
+ @Override
+ public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
+ return Boolean.parseBoolean(createZNode(hostname, CmdType.bool.toString() +
+ getCommandProvider(service).isRunningCommand(service)));
+ }
+
+ enum CmdType {
+ exec,
+ bool
+ }
+}