You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/05/28 10:54:53 UTC
git commit: HBASE-11220 Add listeners to ServerManager and
AssignmentManager
Repository: hbase
Updated Branches:
refs/heads/master ef995efb1 -> ab896f05d
HBASE-11220 Add listeners to ServerManager and AssignmentManager
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ab896f05
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ab896f05
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ab896f05
Branch: refs/heads/master
Commit: ab896f05d1d84f3e634c364a0772ada4c6cd6b9d
Parents: ef995ef
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Wed May 28 09:47:08 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Wed May 28 09:47:08 2014 +0100
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentListener.java | 45 ++++
.../hadoop/hbase/master/AssignmentManager.java | 52 +++-
.../hadoop/hbase/master/ServerListener.java | 42 +++
.../hadoop/hbase/master/ServerManager.java | 35 +++
.../hbase/master/TestAssignmentListener.java | 260 +++++++++++++++++++
5 files changed, 430 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
new file mode 100644
index 0000000..8680e19
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Get notification of assignment events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+@InterfaceAudience.Private
+public interface AssignmentListener {
+ /**
+ * The region was opened on the specified server.
+ * @param regionInfo The opened region.
+ * @param serverName The remote servers name.
+ */
+ void regionOpened(final HRegionInfo regionInfo, final ServerName serverName);
+
+ /**
+ * The region was closed on the region server.
+ * @param regionInfo The closed region.
+ * @param serverName The remote servers name.
+ */
+ void regionClosed(final HRegionInfo regionInfo);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index ba9e103..4dcd3e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -32,6 +32,7 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -222,6 +223,9 @@ public class AssignmentManager extends ZooKeeperListener {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP_SPLIT_HANDLING = false;
+ /** Listeners that are called on assignment events. */
+ private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
+
/**
* Constructs a new assignment manager.
*
@@ -284,6 +288,22 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
+ * Add the listener to the notification list.
+ * @param listener The AssignmentListener to register
+ */
+ public void registerListener(final AssignmentListener listener) {
+ this.listeners.add(listener);
+ }
+
+ /**
+ * Remove the listener from the notification list.
+ * @param listener The AssignmentListener to unregister
+ */
+ public boolean unregisterListener(final AssignmentListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ /**
* @return Instance of ZKTableStateManager.
*/
public TableStateManager getTableStateManager() {
@@ -600,6 +620,7 @@ public class AssignmentManager extends ZooKeeperListener {
// server. If that server is online, when we reload the meta, the
// region is put back to online, we need to offline it.
regionStates.regionOffline(regionInfo);
+ sendRegionClosedNotification(regionInfo);
}
// Put it back in transition so that SSH can re-assign it
regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
@@ -1246,6 +1267,9 @@ public class AssignmentManager extends ZooKeeperListener {
// Remove plan if one.
clearRegionPlan(regionInfo);
balancer.regionOnline(regionInfo, sn);
+
+ // Tell our listeners that a region was opened
+ sendRegionOpenedNotification(regionInfo, sn);
}
/**
@@ -1628,12 +1652,12 @@ public class AssignmentManager extends ZooKeeperListener {
regionOffline(region);
}
return;
- } else if ((t instanceof FailedServerException) || (state != null &&
+ } else if ((t instanceof FailedServerException) || (state != null &&
t instanceof RegionAlreadyInTransitionException)) {
long sleepTime = 0;
Configuration conf = this.server.getConfiguration();
if(t instanceof FailedServerException) {
- sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else {
// RS is already processing this region, only need to update the timestamp
@@ -1981,9 +2005,9 @@ public class AssignmentManager extends ZooKeeperListener {
} else if(plan.getDestination().equals(newPlan.getDestination()) &&
previousException instanceof FailedServerException) {
try {
- LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
+ LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
" to the same failed server.");
- Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
+ Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
} catch (InterruptedException ie) {
LOG.warn("Failed to assign "
@@ -3289,6 +3313,26 @@ public class AssignmentManager extends ZooKeeperListener {
// remove the region plan as well just in case.
clearRegionPlan(regionInfo);
balancer.regionOffline(regionInfo);
+
+ // Tell our listeners that a region was closed
+ sendRegionClosedNotification(regionInfo);
+ }
+
+ private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
+ final ServerName serverName) {
+ if (!this.listeners.isEmpty()) {
+ for (AssignmentListener listener : this.listeners) {
+ listener.regionOpened(regionInfo, serverName);
+ }
+ }
+ }
+
+ private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
+ if (!this.listeners.isEmpty()) {
+ for (AssignmentListener listener : this.listeners) {
+ listener.regionClosed(regionInfo);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
new file mode 100644
index 0000000..bce3712
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Get notification of server events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+@InterfaceAudience.Private
+public interface ServerListener {
+ /**
+ * The server has joined the cluster.
+ * @param serverName The remote servers name.
+ */
+ void serverAdded(final ServerName serverName);
+
+ /**
+ * The server was removed from the cluster.
+ * @param serverName The remote servers name.
+ */
+ void serverRemoved(final ServerName serverName);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 63b48f5..e209162 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -178,6 +179,9 @@ public class ServerManager {
*/
private Map<ServerName, Boolean> requeuedDeadServers = new HashMap<ServerName, Boolean>();
+ /** Listeners that are called on server events. */
+ private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
+
/**
* Constructor.
* @param master
@@ -212,6 +216,22 @@ public class ServerManager {
}
/**
+ * Add the listener to the notification list.
+ * @param listener The ServerListener to register
+ */
+ public void registerListener(final ServerListener listener) {
+ this.listeners.add(listener);
+ }
+
+ /**
+ * Remove the listener from the notification list.
+ * @param listener The ServerListener to unregister
+ */
+ public boolean unregisterListener(final ServerListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ /**
* Let the server manager know a new regionserver has come online
* @param ia The remote address
* @param port The remote port
@@ -305,6 +325,14 @@ public class ServerManager {
}
recordNewServerWithLock(serverName, sl);
}
+
+ // Tell our listeners that a server was added
+ if (!this.listeners.isEmpty()) {
+ for (ServerListener listener : this.listeners) {
+ listener.serverAdded(serverName);
+ }
+ }
+
// Note that we assume that same ts means same server, and don't expire in that case.
// TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
@@ -582,6 +610,13 @@ public class ServerManager {
}
LOG.debug("Added=" + serverName +
" to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
+
+ // Tell our listeners that a server was removed
+ if (!this.listeners.isEmpty()) {
+ for (ServerListener listener : this.listeners) {
+ listener.serverRemoved(serverName);
+ }
+ }
}
public synchronized void processDeadServer(final ServerName serverName) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
new file mode 100644
index 0000000..e960945
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
@@ -0,0 +1,260 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestAssignmentListener {
+ private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static class DummyListener {
+ protected AtomicInteger modified = new AtomicInteger(0);
+
+ public void awaitModifications(int count) throws InterruptedException {
+ while (!modified.compareAndSet(count, 0)) {
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
+ private AtomicInteger closeCount = new AtomicInteger(0);
+ private AtomicInteger openCount = new AtomicInteger(0);
+
+ public DummyAssignmentListener() {
+ }
+
+ public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
+ LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
+ openCount.incrementAndGet();
+ modified.incrementAndGet();
+ }
+
+ public void regionClosed(final HRegionInfo regionInfo) {
+ LOG.info("Assignment close region=" + regionInfo);
+ closeCount.incrementAndGet();
+ modified.incrementAndGet();
+ }
+
+ public void reset() {
+ openCount.set(0);
+ closeCount.set(0);
+ }
+
+ public int getLoadCount() {
+ return openCount.get();
+ }
+
+ public int getCloseCount() {
+ return closeCount.get();
+ }
+ }
+
+ static class DummyServerListener extends DummyListener implements ServerListener {
+ private AtomicInteger removedCount = new AtomicInteger(0);
+ private AtomicInteger addedCount = new AtomicInteger(0);
+
+ public DummyServerListener() {
+ }
+
+ public void serverAdded(final ServerName serverName) {
+ LOG.info("Server added " + serverName);
+ addedCount.incrementAndGet();
+ modified.incrementAndGet();
+ }
+
+ public void serverRemoved(final ServerName serverName) {
+ LOG.info("Server removed " + serverName);
+ removedCount.incrementAndGet();
+ modified.incrementAndGet();
+ }
+
+ public void reset() {
+ addedCount.set(0);
+ removedCount.set(0);
+ }
+
+ public int getAddedCount() {
+ return addedCount.get();
+ }
+
+ public int getRemovedCount() {
+ return removedCount.get();
+ }
+ }
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout=60000)
+ public void testServerListener() throws IOException, InterruptedException {
+ ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
+
+ DummyServerListener listener = new DummyServerListener();
+ serverManager.registerListener(listener);
+ try {
+ MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
+
+ // Start a new Region Server
+ miniCluster.startRegionServer();
+ listener.awaitModifications(1);
+ assertEquals(1, listener.getAddedCount());
+ assertEquals(0, listener.getRemovedCount());
+
+ // Start another Region Server
+ listener.reset();
+ miniCluster.startRegionServer();
+ listener.awaitModifications(1);
+ assertEquals(1, listener.getAddedCount());
+ assertEquals(0, listener.getRemovedCount());
+
+ int nrs = miniCluster.getRegionServerThreads().size();
+
+ // Stop a Region Server
+ listener.reset();
+ miniCluster.stopRegionServer(nrs - 1);
+ listener.awaitModifications(1);
+ assertEquals(0, listener.getAddedCount());
+ assertEquals(1, listener.getRemovedCount());
+
+ // Stop another Region Server
+ listener.reset();
+ miniCluster.stopRegionServer(nrs - 2);
+ listener.awaitModifications(1);
+ assertEquals(0, listener.getAddedCount());
+ assertEquals(1, listener.getRemovedCount());
+ } finally {
+ serverManager.unregisterListener(listener);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testAssignmentListener() throws IOException, InterruptedException {
+ AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+
+ DummyAssignmentListener listener = new DummyAssignmentListener();
+ am.registerListener(listener);
+ try {
+ final String TABLE_NAME_STR = "testtb";
+ final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
+ final byte[] FAMILY = Bytes.toBytes("cf");
+
+ // Create a new table, with a single region
+ LOG.info("Create Table");
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ listener.awaitModifications(1);
+ assertEquals(1, listener.getLoadCount());
+ assertEquals(0, listener.getCloseCount());
+
+ // Add some data
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
+ try {
+ for (int i = 0; i < 10; ++i) {
+ byte[] key = Bytes.toBytes("row-" + i);
+ Put put = new Put(key);
+ put.add(FAMILY, null, key);
+ table.put(put);
+ }
+ } finally {
+ table.close();
+ }
+
+ // Split the table in two
+ LOG.info("Split Table");
+ listener.reset();
+ admin.split(TABLE_NAME_STR, "row-3");
+ listener.awaitModifications(3);
+ assertEquals(2, listener.getLoadCount()); // daughters added
+ assertEquals(1, listener.getCloseCount()); // parent removed
+
+ // Wait for the Regions to be mergeable
+ MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
+ int mergeable = 0;
+ while (mergeable < 2) {
+ Thread.sleep(100);
+ admin.majorCompact(TABLE_NAME_STR);
+ mergeable = 0;
+ for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
+ for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
+ mergeable += region.isMergeable() ? 1 : 0;
+ }
+ }
+ }
+
+ // Merge the two regions
+ LOG.info("Merge Regions");
+ listener.reset();
+ List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
+ assertEquals(2, regions.size());
+ admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+ regions.get(1).getEncodedNameAsBytes(), true);
+ listener.awaitModifications(3);
+ assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
+ assertEquals(1, listener.getLoadCount()); // new merged region added
+ assertEquals(2, listener.getCloseCount()); // daughters removed
+
+ // Delete the table
+ LOG.info("Drop Table");
+ listener.reset();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ listener.awaitModifications(1);
+ assertEquals(0, listener.getLoadCount());
+ assertEquals(1, listener.getCloseCount());
+ } finally {
+ am.unregisterListener(listener);
+ }
+ }
+}