You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:21 UTC
[19/50] [abbrv] hbase git commit: HBASE-16010 Put draining function
through Admin API (Matt Warhaftig)
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-protocol-shaded/src/main/protobuf/Master.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index f4e7da6..0a000ee 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -568,6 +568,27 @@ message SecurityCapabilitiesResponse {
repeated Capability capabilities = 1;
}
+message ListDrainingRegionServersRequest {
+}
+
+message ListDrainingRegionServersResponse {
+ repeated ServerName server_name = 1;
+}
+
+message DrainRegionServersRequest {
+ repeated ServerName server_name = 1;
+}
+
+message DrainRegionServersResponse {
+}
+
+message RemoveDrainFromRegionServersRequest {
+ repeated ServerName server_name = 1;
+}
+
+message RemoveDrainFromRegionServersResponse {
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -863,4 +884,16 @@ service MasterService {
/** Disable a replication peer */
rpc DisableReplicationPeer(DisableReplicationPeerRequest)
returns(DisableReplicationPeerResponse);
+
+ /** Returns a list of ServerNames marked as draining. */
+ rpc listDrainingRegionServers(ListDrainingRegionServersRequest)
+ returns(ListDrainingRegionServersResponse);
+
+ /** Mark a list of ServerNames as draining. */
+ rpc drainRegionServers(DrainRegionServersRequest)
+ returns(DrainRegionServersResponse);
+
+ /** Unmark a list of ServerNames marked as draining. */
+ rpc removeDrainFromRegionServers(RemoveDrainFromRegionServersRequest)
+ returns(RemoveDrainFromRegionServersResponse);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 6b135d9..613c5c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3196,4 +3196,54 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.postDisableReplicationPeer(peerId);
}
}
+
+ @Override
+ public void drainRegionServer(final ServerName server) {
+ String parentZnode = getZooKeeper().znodePaths.drainingZNode;
+ try {
+ String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
+ ZKUtil.createAndFailSilent(getZooKeeper(), node);
+ } catch (KeeperException ke) {
+ LOG.warn(this.zooKeeper.prefix("Unable to add drain for '" + server.getServerName() + "'."),
+ ke);
+ }
+ }
+
+ @Override
+ public List<ServerName> listDrainingRegionServers() {
+ String parentZnode = getZooKeeper().znodePaths.drainingZNode;
+ List<ServerName> serverNames = new ArrayList<ServerName>();
+ List<String> serverStrs = null;
+ try {
+ serverStrs = ZKUtil.listChildrenNoWatch(getZooKeeper(), parentZnode);
+ } catch (KeeperException ke) {
+ LOG.warn(this.zooKeeper.prefix("Unable to list draining servers."), ke);
+ }
+ // No nodes is empty draining list or ZK connectivity issues.
+ if (serverStrs == null) {
+ return serverNames;
+ }
+
+ // Skip invalid ServerNames in result
+ for (String serverStr : serverStrs) {
+ try {
+ serverNames.add(ServerName.parseServerName(serverStr));
+ } catch (IllegalArgumentException iae) {
+ LOG.warn("Unable to cast '" + serverStr + "' to ServerName.", iae);
+ }
+ }
+ return serverNames;
+ }
+
+ @Override
+ public void removeDrainFromRegionServer(ServerName server) {
+ String parentZnode = getZooKeeper().znodePaths.drainingZNode;
+ String node = ZKUtil.joinZNode(parentZnode, server.getServerName());
+ try {
+ ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
+ } catch (KeeperException ke) {
+ LOG.warn(
+ this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 8ee72c6..76da838 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -1693,4 +1693,55 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(e);
}
}
+
+ @Override
+ public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller,
+ ListDrainingRegionServersRequest request) throws ServiceException {
+ ListDrainingRegionServersResponse.Builder response =
+ ListDrainingRegionServersResponse.newBuilder();
+ try {
+ master.checkInitialized();
+ List<ServerName> servers = master.listDrainingRegionServers();
+ for (ServerName server : servers) {
+ response.addServerName(ProtobufUtil.toServerName(server));
+ }
+ } catch (IOException io) {
+ throw new ServiceException(io);
+ }
+
+ return response.build();
+ }
+
+ @Override
+ public DrainRegionServersResponse drainRegionServers(RpcController controller,
+ DrainRegionServersRequest request) throws ServiceException {
+ DrainRegionServersResponse.Builder response = DrainRegionServersResponse.newBuilder();
+ try {
+ master.checkInitialized();
+ for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
+ master.drainRegionServer(ProtobufUtil.toServerName(pbServer));
+ }
+ } catch (IOException io) {
+ throw new ServiceException(io);
+ }
+
+ return response.build();
+ }
+
+ @Override
+ public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller,
+ RemoveDrainFromRegionServersRequest request) throws ServiceException {
+ RemoveDrainFromRegionServersResponse.Builder response =
+ RemoveDrainFromRegionServersResponse.newBuilder();
+ try {
+ master.checkInitialized();
+ for (HBaseProtos.ServerName pbServer : request.getServerNameList()) {
+ master.removeDrainFromRegionServer(ProtobufUtil.toServerName(pbServer));
+ }
+ } catch (IOException io) {
+ throw new ServiceException(io);
+ }
+
+ return response.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index a7395bb..869e7ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
@@ -443,4 +444,23 @@ public interface MasterServices extends Server {
* @param peerId a short name that identifies the peer
*/
void disableReplicationPeer(String peerId) throws ReplicationException, IOException;
+
+ /**
+ * Mark a region server as draining to prevent additional regions from getting assigned to it.
+ * @param server Region servers to drain.
+ */
+ void drainRegionServer(final ServerName server);
+
+ /**
+ * List region servers marked as draining to not get additional regions assigned to them.
+ * @return List of draining servers.
+ */
+ List<ServerName> listDrainingRegionServers();
+
+ /**
+ * Remove drain from a region server to allow additional regions assignments.
+ * @param server Region server to remove drain from.
+ */
+ void removeDrainFromRegionServer(final ServerName server);
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index b1cf1d2..62fde74 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -24,8 +24,14 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -758,4 +764,75 @@ public class TestAdmin2 {
ProcedureInfo[] procList = admin.listProcedures();
assertTrue(procList.length >= 0);
}
+
+ /*
+ * Test that invalid draining server names (invalid start code) don't get added to drain list.
+ */
+ @Test(timeout = 10000, expected = IllegalArgumentException.class)
+ public void testCheckDrainServerName() throws Exception {
+ List<ServerName> servers = new ArrayList<ServerName>();
+ servers.add(ServerName.parseServerName("127.0.0.1:123"));
+ admin.drainRegionServers(servers);
+ }
+
+ /*
+ * This test drains all regions so cannot be run in parallel with other tests.
+ */
+ @Test(timeout = 30000)
+ public void testDrainRegionServers() throws Exception {
+ List<ServerName> drainingServers = admin.listDrainingRegionServers();
+ assertTrue(drainingServers.isEmpty());
+
+ // Drain all region servers.
+ Collection<ServerName> clusterServers = admin.getClusterStatus().getServers();
+ drainingServers = new ArrayList<ServerName>();
+ for (ServerName server : clusterServers) {
+ drainingServers.add(server);
+ }
+ admin.drainRegionServers(drainingServers);
+
+ // Check that drain lists all region servers.
+ drainingServers = admin.listDrainingRegionServers();
+ assertEquals(clusterServers.size(), drainingServers.size());
+ for (ServerName server : clusterServers) {
+ assertTrue(drainingServers.contains(server));
+ }
+
+ // Try for 20 seconds to create table (new region). Will not complete because all RSs draining.
+ TableName hTable = TableName.valueOf("testDrainRegionServer");
+ final HTableDescriptor htd = new HTableDescriptor(hTable);
+ htd.addFamily(new HColumnDescriptor("cf"));
+
+ final Runnable createTable = new Thread() {
+ @Override
+ public void run() {
+ try {
+ admin.createTable(htd);
+ } catch (IOException ioe) {
+ assertTrue(false); // Should not get IOException.
+ }
+ }
+ };
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final java.util.concurrent.Future<?> future = executor.submit(createTable);
+ executor.shutdown();
+ try {
+ future.get(20, TimeUnit.SECONDS);
+ } catch (TimeoutException ie) {
+ assertTrue(true); // Expecting timeout to happen.
+ }
+
+ // Kill executor if still processing.
+ if (!executor.isTerminated()) {
+ executor.shutdownNow();
+ assertTrue(true);
+ }
+
+ // Remove drain list.
+ admin.removeDrainFromRegionServers(drainingServers);
+ drainingServers = admin.listDrainingRegionServers();
+ assertTrue(drainingServers.isEmpty());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 4e85d29..2a5be12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -399,4 +399,19 @@ public class MockNoopMasterServices implements MasterServices, Server {
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
}
+
+ @Override
+ public void drainRegionServer(ServerName server) {
+ return;
+ }
+
+ @Override
+ public List<ServerName> listDrainingRegionServers() {
+ return null;
+ }
+
+ @Override
+ public void removeDrainFromRegionServer(ServerName servers) {
+ return;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
index 7326327..485c1f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import javax.security.auth.login.AppConfigurationEntry;
@@ -319,5 +320,25 @@ public class TestZooKeeperACL {
}
}
+ @Test(timeout = 10000)
+ public void testAdminDrainAllowedOnSecureZK() throws Exception {
+ if (!secureZKAvailable) {
+ return;
+ }
+ List<ServerName> drainingServers = new ArrayList<ServerName>();
+ drainingServers.add(ServerName.parseServerName("ZZZ,123,123"));
+
+ // If unable to connect to secure ZK cluster then this operation would fail.
+ TEST_UTIL.getAdmin().drainRegionServers(drainingServers);
+
+ drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
+ assertEquals(1, drainingServers.size());
+ assertEquals(ServerName.parseServerName("ZZZ,123,123"), drainingServers.get(0));
+
+ TEST_UTIL.getAdmin().removeDrainFromRegionServers(drainingServers);
+ drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers();
+ assertEquals(0, drainingServers.size());
+ }
+
}