You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/12/18 15:26:37 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2299: HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands.

virajjasani commented on a change in pull request #2299:
URL: https://github.com/apache/hbase/pull/2299#discussion_r545872702



##########
File path: hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+
+/**
+ * Class used to start/stop Chaos related services (currently chaosagent)
+ */
+@InterfaceAudience.Private
+public class ChaosService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChaosService.class.getName());
+
+  public static void execute(String[] args, Configuration conf) {
+    LOG.info("arguments : " + Arrays.toString(args));
+
+    try {
+      CommandLine cmdline = new GnuParser().parse(getOptions(), args);
+      if (cmdline.hasOption(ChaosServiceName.chaosagent.toString())) {
+        String actionStr = cmdline.getOptionValue(ChaosServiceName.chaosagent.toString());
+        try {
+          ExecutorAction action = ExecutorAction.valueOf(actionStr.toLowerCase());
+          if (action == ExecutorAction.start) {
+            ChaosServiceStart(conf, ChaosServiceName.chaosagent);
+          } else if (action == ExecutorAction.stop) {
+            ChaosServiceStop();
+          }
+        } catch (IllegalArgumentException e) {
+          LOG.error("action passed:" + actionStr +
+            " . Unexpected action. Please provide only start/stop.");
+          throw new RuntimeException(e);
+        }
+      } else {
+        LOG.error("Invalid Options");
+      }
+    } catch (Exception e) {
+      LOG.error("Error while starting ChaosService : " + e);
+    }
+  }
+
+  private static void ChaosServiceStart(Configuration conf, ChaosServiceName serviceName) {
+    switch (serviceName) {
+      case chaosagent:
+        ChaosAgent.stopChaosAgent.set(false);
+        try {
+          Thread t = new Thread(new ChaosAgent(conf,
+            ChaosUtils.getZKQuorum(conf), ChaosUtils.getHostName()));
+          t.start();
+          t.join();

Review comment:
       Sure, this can be follow up.

##########
File path: hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
##########
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * An agent for executing destructive actions for ChaosMonkey.
+ * Uses ZooKeeper Watchersc and LocalShell, to do the killing
+ * and getting status of service on targeted host without SSH.
+ * uses given ZNode Structure:
+ *  /perfChaosTest (root)
+ *              |
+ *              |
+ *              /chaosAgents (Used for registration has
+ *              hostname ephemeral nodes as children)
+ *              |
+ *              |
+ *              /chaosAgentTaskStatus (Used for task
+ *              Execution, has hostname persistent
+ *              nodes as child with tasks as their children)
+ *                          |
+ *                          |
+ *                          /hostname
+ *                                |
+ *                                |
+ *                                /task0000001 (command as data)
+ *                                (has two types of command :
+ *                                     1: starts with "exec"
+ *                                       for executing a destructive action.
+ *                                     2: starts with "bool" for getting
+ *                                       only status of service.
+ *
+ */
+@InterfaceAudience.Private
+public class ChaosAgent implements Watcher, Closeable, Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class.getName());
+  static AtomicBoolean stopChaosAgent = new AtomicBoolean();
+  private ZooKeeper zk;
+  private String quorum;
+  private String agentName;
+  private Configuration conf;
+  private RetryCounterFactory retryCounterFactory;
+  private volatile boolean connected = false;
+
+  public ChaosAgent(Configuration conf, String quorum, String agentName) {
+    initChaosAgent(conf, quorum, agentName);
+  }
+
+  /***
+   * sets global params and initiates connection with ZooKeeper then does registration.
+   * @param conf initial configuration to use
+   * @param quorum ZK Quorum
+   * @param agentName AgentName to use
+   */
+  private void initChaosAgent(Configuration conf, String quorum, String agentName) {
+    this.conf = conf;
+    this.quorum = quorum;
+    this.agentName = agentName;
+    this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
+      .setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY,
+        ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval(
+          conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
+            ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
+    try {
+      this.createZKConnection(null);
+      this.register();
+    } catch (IOException e) {
+      LOG.error("Error Creating Connection: " + e);
+    }
+  }
+
+  /***
+   * Creates Connection with ZooKeeper.
+   * @throws IOException if something goes wrong
+   */
+  private void createZKConnection(Watcher watcher) throws IOException {
+    if(watcher == null) {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
+    } else {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
+    }
+    LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
+  }
+
+  //WATCHERS: Below are the Watches used by ChaosAgent
+
+  /***
+   * Watcher for notifying if any task is assigned to agent or not,
+   * by seeking if any Node is being added to agent as Child.
+   */
+  Watcher newTaskCreatedWatcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
+        assert (ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+          ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath());

Review comment:
       @lokiore I this here we can provide better Exception than `assert`. Something like:
   ```
   if (!(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
             ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath()) {
       throw new RuntimeException(xxx); // or something better
   }
   ```

##########
File path: hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
##########
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * An agent for executing destructive actions for ChaosMonkey.
+ * Uses ZooKeeper Watchersc and LocalShell, to do the killing
+ * and getting status of service on targeted host without SSH.
+ * uses given ZNode Structure:
+ *  /perfChaosTest (root)
+ *              |
+ *              |
+ *              /chaosAgents (Used for registration has
+ *              hostname ephemeral nodes as children)
+ *              |
+ *              |
+ *              /chaosAgentTaskStatus (Used for task
+ *              Execution, has hostname persistent
+ *              nodes as child with tasks as their children)
+ *                          |
+ *                          |
+ *                          /hostname
+ *                                |
+ *                                |
+ *                                /task0000001 (command as data)
+ *                                (has two types of command :
+ *                                     1: starts with "exec"
+ *                                       for executing a destructive action.
+ *                                     2: starts with "bool" for getting
+ *                                       only status of service.
+ *
+ */
+@InterfaceAudience.Private
+public class ChaosAgent implements Watcher, Closeable, Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class.getName());
+  static AtomicBoolean stopChaosAgent = new AtomicBoolean();
+  private ZooKeeper zk;
+  private String quorum;
+  private String agentName;
+  private Configuration conf;
+  private RetryCounterFactory retryCounterFactory;
+  private volatile boolean connected = false;
+
+  public ChaosAgent(Configuration conf, String quorum, String agentName) {
+    initChaosAgent(conf, quorum, agentName);
+  }
+
+  /***
+   * sets global params and initiates connection with ZooKeeper then does registration.
+   * @param conf initial configuration to use
+   * @param quorum ZK Quorum
+   * @param agentName AgentName to use
+   */
+  private void initChaosAgent(Configuration conf, String quorum, String agentName) {
+    this.conf = conf;
+    this.quorum = quorum;
+    this.agentName = agentName;
+    this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
+      .setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY,
+        ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval(
+          conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
+            ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
+    try {
+      this.createZKConnection(null);
+      this.register();
+    } catch (IOException e) {
+      LOG.error("Error Creating Connection: " + e);
+    }
+  }
+
+  /***
+   * Creates Connection with ZooKeeper.
+   * @throws IOException if something goes wrong
+   */
+  private void createZKConnection(Watcher watcher) throws IOException {
+    if(watcher == null) {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
+    } else {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
+    }
+    LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
+  }
+
+  //WATCHERS: Below are the Watches used by ChaosAgent
+
+  /***
+   * Watcher for notifying if any task is assigned to agent or not,
+   * by seeking if any Node is being added to agent as Child.
+   */
+  Watcher newTaskCreatedWatcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
+        assert (ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+          ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath());
+
+        LOG.info("Change in Tasks Node, checking for Tasks again.");
+        getTasks();
+      }
+
+    }
+  };
+
+  //CALLBACKS: Below are the Callbacks used by Chaos Agent
+
+  /**
+   * Callback used while setting status of a given task, Logs given status.
+   */
+  AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        // Connection to the server was lost while setting status setting again.
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        setStatusOfTaskZNode(path, (String) ctx);
+        break;
+
+      case OK:
+        LOG.info("Status of Task has been set");
+        break;
+
+      case NONODE:
+        LOG.error("Chaos Agent status node does not exists: "
+          + "check for ZNode directory structure again.");
+        break;
+
+      default:
+        LOG.error("Error while setting status of task ZNode: " +
+          path, KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used while creating a Persistent ZNode tries to create
+   * ZNode again if Connection was lost in previous try.
+   */
+  AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        createZNode(path, (byte[]) ctx);
+        break;
+      case OK:
+        LOG.info("ZNode created : " + path);
+        break;
+      case NODEEXISTS:
+        LOG.warn("ZNode already registered: " + path);
+        break;
+      default:
+        LOG.error("Error occurred while creating Persistent ZNode: " + path,
+          KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used while creating a Ephemeral ZNode tries to create ZNode again
+   * if Connection was lost in previous try.
+   */
+  AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        createEphemeralZNode(path, (byte[]) ctx);
+        break;
+      case OK:
+        LOG.info("ZNode created : " + path);
+        break;
+      case NODEEXISTS:
+        LOG.warn("ZNode already registered: " + path);
+        break;
+      default:
+        LOG.error("Error occurred while creating Ephemeral ZNode: ",
+          KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used by getTasksForAgentCallback while getting command,
+   * after getting command successfully, it executes command and
+   * set its status with respect to the command type.
+   */
+  AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() {
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+      switch (KeeperException.Code.get(rc)) {
+        case CONNECTIONLOSS:
+          //Connection to the server has been lost while getting task, getting data again.
+          try {
+            recreateZKConnection();
+          } catch (Exception e) {
+            break;
+          }
+          zk.getData(path,
+            false,
+            getTaskForExecutionCallback,
+            new String(data));
+          break;
+        case OK:
+          String cmd = new String(data);
+          LOG.info("Executing command : " + cmd);
+          String status = ChaosConstants.TASK_COMPLETION_STRING;
+          try {
+            String user = conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER,
+              ChaosConstants.DEFAULT_SHELL_USER);
+            switch (cmd.substring(0, 4)) {
+              case "bool":
+                String ret = execWithRetries(user, cmd.substring(4)).getSecond();
+                status = Boolean.toString(ret.length() > 0);
+                break;
+
+              case "exec":
+                execWithRetries(user, cmd.substring(4));
+                break;
+
+              default:
+                LOG.error("Unknown Command Type");
+                status = ChaosConstants.TASK_ERROR_STRING;
+            }
+          } catch (IOException e) {
+            LOG.error("Got error while executing command : " + cmd +
+              " On agent : " + agentName + " Error : " + e);
+            status = ChaosConstants.TASK_ERROR_STRING;
+          }
+
+          try {
+            setStatusOfTaskZNode(path, status);
+            Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
+          } catch (InterruptedException e) {
+            LOG.error("Error occured after setting status: " + e);
+          }
+
+        default:
+          LOG.error("Error occurred while getting data",
+            KeeperException.create(KeeperException.Code.get(rc), path));
+      }
+    }
+  };
+
+  /***
+   * Callback used while getting Tasks for agent if call executed without Exception,
+   * It creates a separate thread for each children to execute given Tasks parallely.
+   */
+  AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() {
+    @Override
+    public void processResult(int rc, String path, Object ctx, List<String> children) {
+      switch (KeeperException.Code.get(rc)) {
+        case CONNECTIONLOSS:
+          // Connection to the server has been lost, getting tasks again.
+          try {
+            recreateZKConnection();
+          } catch (Exception e) {
+            break;
+          }
+          getTasks();
+          break;
+
+        case OK:
+          if (children != null) {
+            try {
+
+              LOG.info("Executing each task as a separate thread");
+              List<Thread> tasksList = new ArrayList<>();
+              for (String task : children) {
+                String threadName = agentName + "_" + task;
+                Thread t = new Thread(() -> {

Review comment:
       This can also be follow up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org