You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/05/04 17:50:55 UTC
[hbase] branch branch-2.3 updated: HBASE-24260 Add a ClusterManager
that issues commands via coprocessor
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new fa1890c HBASE-24260 Add a ClusterManager that issues commands via coprocessor
fa1890c is described below
commit fa1890c880efc2226bf66f443090f4f9ae23fb1d
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Fri Apr 24 12:32:15 2020 -0700
HBASE-24260 Add a ClusterManager that issues commands via coprocessor
Implements `ClusterManager` that relies on the new
`ShellExecEndpointCoprocessor` for remote shell command execution.
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../src/main/protobuf/ShellExecEndpoint.proto | 42 ++++++
hbase-it/pom.xml | 16 ++-
.../apache/hadoop/hbase/CoprocClusterManager.java | 136 ++++++++++++++++++
.../apache/hadoop/hbase/HBaseClusterManager.java | 27 ++--
.../hadoop/hbase/IntegrationTestingUtility.java | 1 +
.../apache/hadoop/hbase/RESTApiClusterManager.java | 18 +--
.../hadoop/hbase/ShellExecEndpointCoprocessor.java | 156 +++++++++++++++++++++
.../hbase/TestShellExecEndpointCoprocessor.java | 136 ++++++++++++++++++
.../hbase/chaos/factories/MonkeyFactory.java | 1 +
.../hadoop/hbase/chaos/util/ChaosMonkeyRunner.java | 2 +-
.../hbase/ClearUserNamespacesAndTablesRule.java | 2 +-
.../org/apache/hadoop/hbase/ConnectionRule.java | 4 +-
.../org/apache/hadoop/hbase/MiniClusterRule.java | 56 ++++++--
.../hbase/master/TestAlwaysStandByHMaster.java | 13 +-
.../hbase/master/webapp/TestMetaBrowser.java | 2 +-
15 files changed, 569 insertions(+), 43 deletions(-)
diff --git a/hbase-endpoint/src/main/protobuf/ShellExecEndpoint.proto b/hbase-endpoint/src/main/protobuf/ShellExecEndpoint.proto
new file mode 100644
index 0000000..96c72da
--- /dev/null
+++ b/hbase-endpoint/src/main/protobuf/ShellExecEndpoint.proto
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opens a tunnel for remote shell execution on the target server. Used by `CoprocClusterManager`.
+ */
+
+syntax = "proto2";
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "ShellExecEndpoint";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message ShellExecRequest {
+ required string command = 1;
+ optional bool await_response = 2;
+}
+
+message ShellExecResponse {
+ optional int32 exit_code = 1;
+ optional string stdout = 2;
+ optional string stderr = 3;
+}
+
+service ShellExecService {
+ rpc shell_exec(ShellExecRequest) returns(ShellExecResponse);
+}
diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml
index 4a3b1ef..99df2d4 100644
--- a/hbase-it/pom.xml
+++ b/hbase-it/pom.xml
@@ -214,6 +214,10 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-endpoint</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
@@ -271,7 +275,17 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <scope>compile</scope>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/CoprocClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/CoprocClusterManager.java
new file mode 100644
index 0000000..1778ac3
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/CoprocClusterManager.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Overrides commands to make use of coprocessor where possible. Only supports actions taken
+ * against Master and Region Server hosts.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("unused") // no way to test this without a distributed cluster.
+public class CoprocClusterManager extends HBaseClusterManager {
+ private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class);
+ private static final Set<ServiceType> supportedServices = buildSupportedServicesSet();
+
+ @Override
+ protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
+ throws IOException {
+ if (!supportedServices.contains(service)) {
+ throw unsupportedServiceType(service);
+ }
+
+ // We only support actions vs. Master or Region Server processes. We're issuing those actions
+ // via the coprocessor that's running within those processes. Thus, there's no support for
+ // honoring the configured service user.
+ final String command = StringUtils.join(cmd, " ");
+ LOG.info("Executing remote command: {}, hostname:{}", command, hostname);
+
+ try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) {
+ final AsyncAdmin admin = conn.getAdmin();
+ final ShellExecRequest req = ShellExecRequest.newBuilder()
+ .setCommand(command)
+ .setAwaitResponse(false)
+ .build();
+
+ final ShellExecResponse resp;
+ switch(service) {
+ case HBASE_MASTER:
+ // What happens if the intended action was killing a backup master? Right now we have
+ // no `RestartBackupMasterAction` so it's probably fine.
+ resp = masterExec(admin, req);
+ break;
+ case HBASE_REGIONSERVER:
+ final ServerName targetHost = resolveRegionServerName(admin, hostname);
+ resp = regionServerExec(admin, req, targetHost);
+ break;
+ default:
+ throw new RuntimeException("should not happen");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, resp.getExitCode(),
+ resp.getStdout());
+ } else {
+ LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode());
+ }
+ return new Pair<>(resp.getExitCode(), resp.getStdout());
+ }
+ }
+
+ private static Set<ServiceType> buildSupportedServicesSet() {
+ final Set<ServiceType> set = new HashSet<>();
+ set.add(ServiceType.HBASE_MASTER);
+ set.add(ServiceType.HBASE_REGIONSERVER);
+ return Collections.unmodifiableSet(set);
+ }
+
+ private static ShellExecResponse masterExec(final AsyncAdmin admin,
+ final ShellExecRequest req) {
+ // TODO: Admin API provides no means of sending exec to a backup master.
+ return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
+ ShellExecService::newStub,
+ (stub, controller, callback) -> stub.shellExec(controller, req, callback))
+ .join();
+ }
+
+ private static ShellExecResponse regionServerExec(final AsyncAdmin admin,
+ final ShellExecRequest req, final ServerName targetHost) {
+ return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
+ ShellExecService::newStub,
+ (stub, controller, callback) -> stub.shellExec(controller, req, callback),
+ targetHost)
+ .join();
+ }
+
+ private static ServerName resolveRegionServerName(final AsyncAdmin admin,
+ final String hostname) {
+ return admin.getRegionServers()
+ .thenApply(names -> names.stream()
+ .filter(sn -> Objects.equals(sn.getHostname(), hostname))
+ .findAny())
+ .join()
+ .orElseThrow(() -> serverNotFound(hostname));
+ }
+
+ private static RuntimeException serverNotFound(final String hostname) {
+ return new RuntimeException(
+ String.format("Did not find %s amongst the servers known to the client.", hostname));
+ }
+
+ private static RuntimeException unsupportedServiceType(final ServiceType serviceType) {
+ return new RuntimeException(
+ String.format("Unable to service request for service=%s", serviceType));
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java
index 2f75c73..957e094 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -45,9 +44,12 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class HBaseClusterManager extends Configured implements ClusterManager {
- private static final String SIGKILL = "SIGKILL";
- private static final String SIGSTOP = "SIGSTOP";
- private static final String SIGCONT = "SIGCONT";
+
+ protected enum Signal {
+ SIGKILL,
+ SIGSTOP,
+ SIGCONT,
+ }
protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
private String sshUserName;
@@ -107,7 +109,7 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
}
- private String getServiceUser(ServiceType service) {
+ protected String getServiceUser(ServiceType service) {
Configuration conf = getConf();
switch (service) {
case HADOOP_DATANODE:
@@ -329,9 +331,9 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
* @return pair of exit code and command output
* @throws IOException if something goes wrong.
*/
- private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
+ protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
throws IOException {
- LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
+ LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "),
hostname);
RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
@@ -444,8 +446,9 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
exec(hostname, service, Operation.RESTART);
}
- public void signal(ServiceType service, String signal, String hostname) throws IOException {
- execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
+ public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
+ execWithRetries(hostname, service,
+ getCommandProvider(service).signalCommand(service, signal.toString()));
}
@Override
@@ -457,16 +460,16 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
@Override
public void kill(ServiceType service, String hostname, int port) throws IOException {
- signal(service, SIGKILL, hostname);
+ signal(service, Signal.SIGKILL, hostname);
}
@Override
public void suspend(ServiceType service, String hostname, int port) throws IOException {
- signal(service, SIGSTOP, hostname);
+ signal(service, Signal.SIGSTOP, hostname);
}
@Override
public void resume(ServiceType service, String hostname, int port) throws IOException {
- signal(service, SIGCONT, hostname);
+ signal(service, Signal.SIGCONT, hostname);
}
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java
index a9e555e..d617523 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestingUtility.java
@@ -147,6 +147,7 @@ public class IntegrationTestingUtility extends HBaseTestingUtility {
HConstants.HBASE_DIR, conf.get(HConstants.HBASE_DIR));
Class<? extends ClusterManager> clusterManagerClass = conf.getClass(HBASE_CLUSTER_MANAGER_CLASS,
DEFAULT_HBASE_CLUSTER_MANAGER_CLASS, ClusterManager.class);
+ LOG.info("Instantiating {}", clusterManagerClass.getName());
ClusterManager clusterManager = ReflectionUtils.newInstance(
clusterManagerClass, conf);
setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager));
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java
index 479f609..f5dd93b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java
@@ -74,6 +74,8 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
"hbase.it.clustermanager.restapi.password";
private static final String REST_API_CLUSTER_MANAGER_CLUSTER_NAME =
"hbase.it.clustermanager.restapi.clustername";
+ private static final String REST_API_DELEGATE_CLUSTER_MANAGER =
+ "hbase.it.clustermanager.restapi.delegate";
private static final JsonParser parser = new JsonParser();
@@ -86,8 +88,6 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
// Fields for the hostname, username, password, and cluster name of the cluster management server
// to be used.
private String serverHostname;
- private String serverUsername;
- private String serverPassword;
private String clusterName;
// Each version of Cloudera Manager supports a particular API versions. Version 6 of this API
@@ -103,10 +103,7 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class);
- RESTApiClusterManager() {
- hBaseClusterManager = ReflectionUtils.newInstance(HBaseClusterManager.class,
- new IntegrationTestingUtility().getConfiguration());
- }
+ RESTApiClusterManager() { }
@Override
public void setConf(Configuration conf) {
@@ -115,12 +112,17 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
// `Configured()` constructor calls `setConf(null)` before calling again with a real value.
return;
}
+
+ final Class<? extends ClusterManager> clazz = conf.getClass(REST_API_DELEGATE_CLUSTER_MANAGER,
+ HBaseClusterManager.class, ClusterManager.class);
+ hBaseClusterManager = ReflectionUtils.newInstance(clazz, conf);
+
serverHostname = conf.get(REST_API_CLUSTER_MANAGER_HOSTNAME, DEFAULT_SERVER_HOSTNAME);
- serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
- serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
clusterName = conf.get(REST_API_CLUSTER_MANAGER_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
// Add filter to Client instance to enable server authentication.
+ String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
+ String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword));
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
new file mode 100644
index 0000000..6ca4c19
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Receives shell commands from the client and executes them blindly. Intended only for use
+ * by {@link ChaosMonkey} via {@link CoprocClusterManager}
+ */
+@InterfaceAudience.Private
+public class ShellExecEndpointCoprocessor extends ShellExecEndpoint.ShellExecService implements
+ MasterCoprocessor, RegionServerCoprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(ShellExecEndpointCoprocessor.class);
+
+ public static final String BACKGROUND_DELAY_MS_KEY = "hbase.it.shellexeccoproc.async.delay.ms";
+ public static final long DEFAULT_BACKGROUND_DELAY_MS = 1_000;
+
+ private final ExecutorService backgroundExecutor;
+ private Configuration conf;
+
+ public ShellExecEndpointCoprocessor() {
+ backgroundExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(ShellExecEndpointCoprocessor.class.getSimpleName() + "-{}")
+ .setDaemon(true)
+ .setUncaughtExceptionHandler((t, e) -> LOG.warn("Thread {} threw", t, e))
+ .build());
+ }
+
+ @Override
+ public Iterable<Service> getServices() {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) {
+ conf = env.getConfiguration();
+ }
+
+ @Override
+ public void shellExec(
+ final RpcController controller,
+ final ShellExecRequest request,
+ final RpcCallback<ShellExecResponse> done
+ ) {
+ final String command = request.getCommand();
+ if (StringUtils.isBlank(command)) {
+ throw new RuntimeException("Request contained an empty command.");
+ }
+ final boolean awaitResponse = !request.hasAwaitResponse() || request.getAwaitResponse();
+ final String[] subShellCmd = new String[] { "/usr/bin/env", "bash", "-c", command };
+ final Shell.ShellCommandExecutor shell = new Shell.ShellCommandExecutor(subShellCmd);
+
+ final String msgFmt = "Executing command"
+ + (!awaitResponse ? " on a background thread" : "") + ": {}";
+ LOG.info(msgFmt, command);
+
+ if (awaitResponse) {
+ runForegroundTask(shell, controller, done);
+ } else {
+ runBackgroundTask(shell, done);
+ }
+ }
+
+ private void runForegroundTask(
+ final Shell.ShellCommandExecutor shell,
+ final RpcController controller,
+ final RpcCallback<ShellExecResponse> done
+ ) {
+ ShellExecResponse.Builder builder = ShellExecResponse.newBuilder();
+ try {
+ doExec(shell, builder);
+ } catch (IOException e) {
+ LOG.error("Failure launching process", e);
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ }
+ done.run(builder.build());
+ }
+
+ private void runBackgroundTask(
+ final Shell.ShellCommandExecutor shell,
+ final RpcCallback<ShellExecResponse> done
+ ) {
+ final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS);
+ backgroundExecutor.submit(() -> {
+ try {
+ // sleep first so that the RPC can ACK. race condition here as we have no means of blocking
+ // until the IPC response has been acknowledged by the client.
+ Thread.sleep(sleepDuration);
+ doExec(shell, ShellExecResponse.newBuilder());
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted before launching process.", e);
+ } catch (IOException e) {
+ LOG.error("Failure launching process", e);
+ }
+ });
+ done.run(ShellExecResponse.newBuilder().build());
+ }
+
+ /**
+ * Execute {@code shell} and collect results into {@code builder} as side-effects.
+ */
+ private void doExec(
+ final Shell.ShellCommandExecutor shell,
+ final ShellExecResponse.Builder builder
+ ) throws IOException {
+ try {
+ shell.execute();
+ builder
+ .setExitCode(shell.getExitCode())
+ .setStdout(shell.getOutput());
+ } catch (Shell.ExitCodeException e) {
+ LOG.warn("Launched process failed", e);
+ builder
+ .setExitCode(e.getExitCode())
+ .setStdout(shell.getOutput())
+ .setStderr(e.getMessage());
+ }
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java
new file mode 100644
index 0000000..81519bc
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/TestShellExecEndpointCoprocessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for the {@link ShellExecEndpointCoprocessor}.
+ */
+@Category(MediumTests.class)
+public class TestShellExecEndpointCoprocessor {
+
+ @ClassRule
+ public static final HBaseClassTestRule testRule =
+ HBaseClassTestRule.forClass(TestShellExecEndpointCoprocessor.class);
+
+ @ClassRule
+ public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
+ .setConfiguration(createConfiguration())
+ .build();
+
+ @Rule
+ public final ConnectionRule connectionRule =
+ new ConnectionRule(miniClusterRule::createConnection);
+
+ @Test
+ public void testShellExecUnspecified() {
+ testShellExecForeground(b -> {});
+ }
+
+ @Test
+ public void testShellExecForeground() {
+ testShellExecForeground(b -> b.setAwaitResponse(true));
+ }
+
+ private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
+ final AsyncConnection conn = connectionRule.getConnection();
+ final AsyncAdmin admin = conn.getAdmin();
+
+ final String command = "echo -n \"hello world\"";
+ final ShellExecRequest.Builder builder = ShellExecRequest.newBuilder()
+ .setCommand(command);
+ consumer.accept(builder);
+ final ShellExecResponse resp = admin
+ .<ShellExecService.Stub, ShellExecResponse>coprocessorService(
+ ShellExecService::newStub,
+ (stub, controller, callback) -> stub.shellExec(controller, builder.build(), callback))
+ .join();
+ assertEquals(0, resp.getExitCode());
+ assertEquals("hello world", resp.getStdout());
+ }
+
+ @Test
+ public void testShellExecBackground() throws IOException {
+ final AsyncConnection conn = connectionRule.getConnection();
+ final AsyncAdmin admin = conn.getAdmin();
+
+ final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
+ final File testFile = new File(testDataDir, "shell_exec_background.txt");
+ assertTrue(testFile.createNewFile());
+ assertEquals(0, testFile.length());
+
+ final String command = "echo \"hello world\" >> " + testFile.getAbsolutePath();
+ final ShellExecRequest req = ShellExecRequest.newBuilder()
+ .setCommand(command)
+ .setAwaitResponse(false)
+ .build();
+ final ShellExecResponse resp = admin
+ .<ShellExecService.Stub, ShellExecResponse>coprocessorService(
+ ShellExecService::newStub,
+ (stub, controller, callback) -> stub.shellExec(controller, req, callback))
+ .join();
+
+ assertFalse("the response from a background task should have no exit code", resp.hasExitCode());
+ assertFalse("the response from a background task should have no stdout", resp.hasStdout());
+ assertFalse("the response from a background task should have no stderr", resp.hasStderr());
+
+ Waiter.waitFor(conn.getConfiguration(), 5_000, () -> testFile.length() > 0);
+ final String content = new String(Files.readAllBytes(testFile.toPath())).trim();
+ assertEquals("hello world", content);
+ }
+
+ private static File ensureTestDataDirExists(
+ final HBaseTestingUtility testingUtility
+ ) throws IOException {
+ final Path testDataDir = Optional.of(testingUtility)
+ .map(HBaseTestingUtility::getDataTestDir)
+ .map(Object::toString)
+ .map(val -> Paths.get(val))
+ .orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
+ final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
+ assertTrue(testDataDirFile.exists());
+ return testDataDirFile;
+ }
+
+ private static Configuration createConfiguration() {
+ final Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.coprocessor.master.classes", ShellExecEndpointCoprocessor.class.getName());
+ return conf;
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
index 73f6968..6f93715 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
@@ -102,6 +102,7 @@ public abstract class MonkeyFactory {
try {
klass = Class.forName(factoryName);
if (klass != null) {
+ LOG.info("Instantiating {}", klass.getName());
fact = (MonkeyFactory) ReflectionUtils.newInstance(klass);
}
} catch (Exception e) {
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/util/ChaosMonkeyRunner.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/util/ChaosMonkeyRunner.java
index 504bd62..24ade5d 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/util/ChaosMonkeyRunner.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/util/ChaosMonkeyRunner.java
@@ -120,7 +120,7 @@ public class ChaosMonkeyRunner extends AbstractHBaseTool {
util.createDistributedHBaseCluster();
util.checkNodeCount(1);// make sure there's at least 1 alive rs
} else {
- throw new RuntimeException("ChaosMonkeyRunner must run againt a distributed cluster,"
+ throw new RuntimeException("ChaosMonkeyRunner must run against a distributed cluster,"
+ " please check and point to the right configuration dir");
}
this.setConf(util.getConfiguration());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ClearUserNamespacesAndTablesRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ClearUserNamespacesAndTablesRule.java
index 6400eb8..b0ea6f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ClearUserNamespacesAndTablesRule.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ClearUserNamespacesAndTablesRule.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
* <pre>{@code
* public class TestMyClass {
* @ClassRule
- * public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
+ * public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
*
* private final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java
index bf4c5aa..21ca35a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ConnectionRule.java
@@ -33,13 +33,13 @@ import org.junit.rules.ExternalResource;
*
* <pre>{@code
* public class TestMyClass {
- * private static final MiniClusterRule miniClusterRule = new MiniClusterRule();
+ * private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
* private static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
*
* @ClassRule
* public static final TestRule rule = RuleChain
- * .outerRule(connectionRule)
+ * .outerRule(miniClusterRule)
* .around(connectionRule);
* }
* }</pre>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
index 89fbded4..4ee068f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.ClassRule;
@@ -36,7 +37,7 @@ import org.junit.rules.TestRule;
* <pre>{@code
* public class TestMyClass {
* @ClassRule
- * public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
+ * public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
*
* @Rule
* public final ConnectionRule connectionRule =
@@ -44,25 +45,54 @@ import org.junit.rules.TestRule;
* }
* }</pre>
*/
-public class MiniClusterRule extends ExternalResource {
+public final class MiniClusterRule extends ExternalResource {
+
+ /**
+ * A builder for fluent composition of a new {@link MiniClusterRule}.
+ */
+ public static class Builder {
+
+ private StartMiniClusterOption miniClusterOption;
+ private Configuration conf;
+
+ /**
+ * Use the provided {@link StartMiniClusterOption} to construct the {@link MiniHBaseCluster}.
+ */
+ public Builder setMiniClusterOption(final StartMiniClusterOption miniClusterOption) {
+ this.miniClusterOption = miniClusterOption;
+ return this;
+ }
+
+ /**
+ * Seed the underlying {@link HBaseTestingUtility} with the provided {@link Configuration}.
+ */
+ public Builder setConfiguration(final Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public MiniClusterRule build() {
+ return new MiniClusterRule(
+ conf,
+ miniClusterOption != null
+ ? miniClusterOption
+ : StartMiniClusterOption.builder().build());
+ }
+ }
+
private final HBaseTestingUtility testingUtility;
private final StartMiniClusterOption miniClusterOptions;
private MiniHBaseCluster miniCluster;
- /**
- * Create an instance over the default options provided by {@link StartMiniClusterOption}.
- */
- public MiniClusterRule() {
- this(StartMiniClusterOption.builder().build());
+ private MiniClusterRule(final Configuration conf,
+ final StartMiniClusterOption miniClusterOptions) {
+ this.testingUtility = new HBaseTestingUtility(conf);
+ this.miniClusterOptions = miniClusterOptions;
}
- /**
- * Create an instance using the provided {@link StartMiniClusterOption}.
- */
- public MiniClusterRule(final StartMiniClusterOption miniClusterOptions) {
- this.testingUtility = new HBaseTestingUtility();
- this.miniClusterOptions = miniClusterOptions;
+ public static Builder newBuilder() {
+ return new Builder();
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java
index a49ae50..2e6af15 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java
@@ -36,13 +36,18 @@ public class TestAlwaysStandByHMaster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
+ HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
- private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder().
- numAlwaysStandByMasters(1).numMasters(1).numRegionServers(1).build();
+ private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder()
+ .numAlwaysStandByMasters(1)
+ .numMasters(1)
+ .numRegionServers(1)
+ .build();
@ClassRule
- public static final MiniClusterRule miniClusterRule = new MiniClusterRule(OPTION);
+ public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
+ .setMiniClusterOption(OPTION)
+ .build();
/**
* Tests that the AlwaysStandByHMaster does not transition to active state even if no active
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/webapp/TestMetaBrowser.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/webapp/TestMetaBrowser.java
index da2a4a0..389c6f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/webapp/TestMetaBrowser.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/webapp/TestMetaBrowser.java
@@ -65,7 +65,7 @@ public class TestMetaBrowser {
public static final HBaseClassTestRule testRule =
HBaseClassTestRule.forClass(TestMetaBrowser.class);
@ClassRule
- public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
+ public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
private final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);