You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/05/28 10:54:53 UTC

git commit: HBASE-11220 Add listeners to ServerManager and AssignmentManager

Repository: hbase
Updated Branches:
  refs/heads/master ef995efb1 -> ab896f05d


HBASE-11220 Add listeners to ServerManager and AssignmentManager


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ab896f05
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ab896f05
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ab896f05

Branch: refs/heads/master
Commit: ab896f05d1d84f3e634c364a0772ada4c6cd6b9d
Parents: ef995ef
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Wed May 28 09:47:08 2014 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Wed May 28 09:47:08 2014 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/master/AssignmentListener.java |  45 ++++
 .../hadoop/hbase/master/AssignmentManager.java  |  52 +++-
 .../hadoop/hbase/master/ServerListener.java     |  42 +++
 .../hadoop/hbase/master/ServerManager.java      |  35 +++
 .../hbase/master/TestAssignmentListener.java    | 260 +++++++++++++++++++
 5 files changed, 430 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
new file mode 100644
index 0000000..8680e19
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Get notification of assignment events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+@InterfaceAudience.Private
+public interface AssignmentListener {
+  /**
+   * The region was opened on the specified server.
+   * @param regionInfo The opened region.
+   * @param serverName The remote servers name.
+   */
+  void regionOpened(final HRegionInfo regionInfo, final ServerName serverName);
+
+  /**
+   * The region was closed on the region server.
+   * @param regionInfo The closed region.
+   * @param serverName The remote servers name.
+   */
+  void regionClosed(final HRegionInfo regionInfo);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index ba9e103..4dcd3e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -32,6 +32,7 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -222,6 +223,9 @@ public class AssignmentManager extends ZooKeeperListener {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
 
+  /** Listeners that are called on assignment events. */
+  private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
+
   /**
    * Constructs a new assignment manager.
    *
@@ -284,6 +288,22 @@ public class AssignmentManager extends ZooKeeperListener {
   }
 
   /**
+   * Add the listener to the notification list.
+   * @param listener The AssignmentListener to register
+   */
+  public void registerListener(final AssignmentListener listener) {
+    this.listeners.add(listener);
+  }
+
+  /**
+   * Remove the listener from the notification list.
+   * @param listener The AssignmentListener to unregister
+   */
+  public boolean unregisterListener(final AssignmentListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  /**
    * @return Instance of ZKTableStateManager.
    */
   public TableStateManager getTableStateManager() {
@@ -600,6 +620,7 @@ public class AssignmentManager extends ZooKeeperListener {
         // server. If that server is online, when we reload the meta, the
         // region is put back to online, we need to offline it.
         regionStates.regionOffline(regionInfo);
+        sendRegionClosedNotification(regionInfo);
       }
       // Put it back in transition so that SSH can re-assign it
       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
@@ -1246,6 +1267,9 @@ public class AssignmentManager extends ZooKeeperListener {
     // Remove plan if one.
     clearRegionPlan(regionInfo);
     balancer.regionOnline(regionInfo, sn);
+
+    // Tell our listeners that a region was opened
+    sendRegionOpenedNotification(regionInfo, sn);
   }
 
   /**
@@ -1628,12 +1652,12 @@ public class AssignmentManager extends ZooKeeperListener {
             regionOffline(region);
           }
           return;
-        } else if ((t instanceof FailedServerException) || (state != null && 
+        } else if ((t instanceof FailedServerException) || (state != null &&
             t instanceof RegionAlreadyInTransitionException)) {
           long sleepTime = 0;
           Configuration conf = this.server.getConfiguration();
           if(t instanceof FailedServerException) {
-            sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 
+            sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
           } else {
             // RS is already processing this region, only need to update the timestamp
@@ -1981,9 +2005,9 @@ public class AssignmentManager extends ZooKeeperListener {
           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
               previousException instanceof FailedServerException) {
             try {
-              LOG.info("Trying to re-assign " + region.getRegionNameAsString() + 
+              LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
                 " to the same failed server.");
-              Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 
+              Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
             } catch (InterruptedException ie) {
               LOG.warn("Failed to assign "
@@ -3289,6 +3313,26 @@ public class AssignmentManager extends ZooKeeperListener {
     // remove the region plan as well just in case.
     clearRegionPlan(regionInfo);
     balancer.regionOffline(regionInfo);
+
+    // Tell our listeners that a region was closed
+    sendRegionClosedNotification(regionInfo);
+  }
+
+  private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
+      final ServerName serverName) {
+    if (!this.listeners.isEmpty()) {
+      for (AssignmentListener listener : this.listeners) {
+        listener.regionOpened(regionInfo, serverName);
+      }
+    }
+  }
+
+  private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
+    if (!this.listeners.isEmpty()) {
+      for (AssignmentListener listener : this.listeners) {
+        listener.regionClosed(regionInfo);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
new file mode 100644
index 0000000..bce3712
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Get notification of server events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+@InterfaceAudience.Private
+public interface ServerListener {
+  /**
+   * The server has joined the cluster.
+   * @param serverName The remote servers name.
+   */
+  void serverAdded(final ServerName serverName);
+
+  /**
+   * The server was removed from the cluster.
+   * @param serverName The remote servers name.
+   */
+  void serverRemoved(final ServerName serverName);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 63b48f5..e209162 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -178,6 +179,9 @@ public class ServerManager {
    */
   private Map<ServerName, Boolean> requeuedDeadServers = new HashMap<ServerName, Boolean>();
 
+  /** Listeners that are called on server events. */
+  private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
+
   /**
    * Constructor.
    * @param master
@@ -212,6 +216,22 @@ public class ServerManager {
   }
 
   /**
+   * Add the listener to the notification list.
+   * @param listener The ServerListener to register
+   */
+  public void registerListener(final ServerListener listener) {
+    this.listeners.add(listener);
+  }
+
+  /**
+   * Remove the listener from the notification list.
+   * @param listener The ServerListener to unregister
+   */
+  public boolean unregisterListener(final ServerListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  /**
    * Let the server manager know a new regionserver has come online
    * @param ia The remote address
    * @param port The remote port
@@ -305,6 +325,14 @@ public class ServerManager {
       }
       recordNewServerWithLock(serverName, sl);
     }
+
+    // Tell our listeners that a server was added
+    if (!this.listeners.isEmpty()) {
+      for (ServerListener listener : this.listeners) {
+        listener.serverAdded(serverName);
+      }
+    }
+
     // Note that we assume that same ts means same server, and don't expire in that case.
     //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
     if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
@@ -582,6 +610,13 @@ public class ServerManager {
     }
     LOG.debug("Added=" + serverName +
       " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
+
+    // Tell our listeners that a server was removed
+    if (!this.listeners.isEmpty()) {
+      for (ServerListener listener : this.listeners) {
+        listener.serverRemoved(serverName);
+      }
+    }
   }
 
   public synchronized void processDeadServer(final ServerName serverName) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab896f05/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
new file mode 100644
index 0000000..e960945
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java
@@ -0,0 +1,260 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestAssignmentListener {
+  private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static class DummyListener {
+    protected AtomicInteger modified = new AtomicInteger(0);
+
+    public void awaitModifications(int count) throws InterruptedException {
+      while (!modified.compareAndSet(count, 0)) {
+        Thread.sleep(100);
+      }
+    }
+  }
+
+  static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
+    private AtomicInteger closeCount = new AtomicInteger(0);
+    private AtomicInteger openCount = new AtomicInteger(0);
+
+    public DummyAssignmentListener() {
+    }
+
+    public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
+      LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
+      openCount.incrementAndGet();
+      modified.incrementAndGet();
+    }
+
+    public void regionClosed(final HRegionInfo regionInfo) {
+      LOG.info("Assignment close region=" + regionInfo);
+      closeCount.incrementAndGet();
+      modified.incrementAndGet();
+    }
+
+    public void reset() {
+      openCount.set(0);
+      closeCount.set(0);
+    }
+
+    public int getLoadCount() {
+      return openCount.get();
+    }
+
+    public int getCloseCount() {
+      return closeCount.get();
+    }
+  }
+
+  static class DummyServerListener extends DummyListener implements ServerListener {
+    private AtomicInteger removedCount = new AtomicInteger(0);
+    private AtomicInteger addedCount = new AtomicInteger(0);
+
+    public DummyServerListener() {
+    }
+
+    public void serverAdded(final ServerName serverName) {
+      LOG.info("Server added " + serverName);
+      addedCount.incrementAndGet();
+      modified.incrementAndGet();
+    }
+
+    public void serverRemoved(final ServerName serverName) {
+      LOG.info("Server removed " + serverName);
+      removedCount.incrementAndGet();
+      modified.incrementAndGet();
+    }
+
+    public void reset() {
+      addedCount.set(0);
+      removedCount.set(0);
+    }
+
+    public int getAddedCount() {
+      return addedCount.get();
+    }
+
+    public int getRemovedCount() {
+      return removedCount.get();
+    }
+  }
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void afterAllTests() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test(timeout=60000)
+  public void testServerListener() throws IOException, InterruptedException {
+    ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
+
+    DummyServerListener listener = new DummyServerListener();
+    serverManager.registerListener(listener);
+    try {
+      MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
+
+      // Start a new Region Server
+      miniCluster.startRegionServer();
+      listener.awaitModifications(1);
+      assertEquals(1, listener.getAddedCount());
+      assertEquals(0, listener.getRemovedCount());
+
+      // Start another Region Server
+      listener.reset();
+      miniCluster.startRegionServer();
+      listener.awaitModifications(1);
+      assertEquals(1, listener.getAddedCount());
+      assertEquals(0, listener.getRemovedCount());
+
+      int nrs = miniCluster.getRegionServerThreads().size();
+
+      // Stop a Region Server
+      listener.reset();
+      miniCluster.stopRegionServer(nrs - 1);
+      listener.awaitModifications(1);
+      assertEquals(0, listener.getAddedCount());
+      assertEquals(1, listener.getRemovedCount());
+
+      // Stop another Region Server
+      listener.reset();
+      miniCluster.stopRegionServer(nrs - 2);
+      listener.awaitModifications(1);
+      assertEquals(0, listener.getAddedCount());
+      assertEquals(1, listener.getRemovedCount());
+    } finally {
+      serverManager.unregisterListener(listener);
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testAssignmentListener() throws IOException, InterruptedException {
+    AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+
+    DummyAssignmentListener listener = new DummyAssignmentListener();
+    am.registerListener(listener);
+    try {
+      final String TABLE_NAME_STR = "testtb";
+      final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
+      final byte[] FAMILY = Bytes.toBytes("cf");
+
+      // Create a new table, with a single region
+      LOG.info("Create Table");
+      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+      listener.awaitModifications(1);
+      assertEquals(1, listener.getLoadCount());
+      assertEquals(0, listener.getCloseCount());
+
+      // Add some data
+      HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
+      try {
+        for (int i = 0; i < 10; ++i) {
+          byte[] key = Bytes.toBytes("row-" + i);
+          Put put = new Put(key);
+          put.add(FAMILY, null, key);
+          table.put(put);
+        }
+      } finally {
+        table.close();
+      }
+
+      // Split the table in two
+      LOG.info("Split Table");
+      listener.reset();
+      admin.split(TABLE_NAME_STR, "row-3");
+      listener.awaitModifications(3);
+      assertEquals(2, listener.getLoadCount());     // daughters added
+      assertEquals(1, listener.getCloseCount());    // parent removed
+
+      // Wait for the Regions to be mergeable
+      MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
+      int mergeable = 0;
+      while (mergeable < 2) {
+        Thread.sleep(100);
+        admin.majorCompact(TABLE_NAME_STR);
+        mergeable = 0;
+        for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
+          for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
+            mergeable += region.isMergeable() ? 1 : 0;
+          }
+        }
+      }
+
+      // Merge the two regions
+      LOG.info("Merge Regions");
+      listener.reset();
+      List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
+      assertEquals(2, regions.size());
+      admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+        regions.get(1).getEncodedNameAsBytes(), true);
+      listener.awaitModifications(3);
+      assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
+      assertEquals(1, listener.getLoadCount());     // new merged region added
+      assertEquals(2, listener.getCloseCount());    // daughters removed
+
+      // Delete the table
+      LOG.info("Drop Table");
+      listener.reset();
+      TEST_UTIL.deleteTable(TABLE_NAME);
+      listener.awaitModifications(1);
+      assertEquals(0, listener.getLoadCount());
+      assertEquals(1, listener.getCloseCount());
+    } finally {
+      am.unregisterListener(listener);
+    }
+  }
+}