You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/08/06 01:10:37 UTC
svn commit: r1510800 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
test/java/org/apache/hadoop/hbase/master/
Author: ddas
Date: Mon Aug 5 23:10:37 2013
New Revision: 1510800
URL: http://svn.apache.org/r1510800
Log:
HBASE-9095. AssignmentManager's handleRegion should respect the single threaded nature of the processing
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Aug 5 23:10:37 2013
@@ -159,6 +159,17 @@ public class AssignmentManager extends Z
private final ExecutorService executorService;
+ // For unit tests, keep track of calls to ClosedRegionHandler
+ private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled =
+ new HashMap<HRegionInfo, AtomicBoolean>();
+
+ // For unit tests, keep track of calls to OpenedRegionHandler
+ private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled =
+ new HashMap<HRegionInfo, AtomicBoolean>();
+
+ // For unit tests, keep track of calls to SplitRegionHandler
+ private AtomicBoolean splitRegionHandlerCalled = new AtomicBoolean(false);
+
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
@@ -836,8 +847,8 @@ public class AssignmentManager extends Z
break;
}
// Run handler to do the rest of the SPLIT handling.
- this.executorService.submit(new SplitRegionHandler(server, this,
- regionState.getRegion(), sn, daughters));
+ new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
+ splitRegionHandlerCalled.set(true);
break;
case RS_ZK_REGION_MERGING:
@@ -872,8 +883,7 @@ public class AssignmentManager extends Z
+ merge_a + ", rs_b=" + merge_b);
}
// Run handler to do the rest of the MERGED handling.
- this.executorService.submit(new MergedRegionHandler(
- server, this, sn, mergeRegions));
+ new MergedRegionHandler(server, this, sn, mergeRegions).process();
break;
case M_ZK_REGION_CLOSING:
@@ -907,8 +917,8 @@ public class AssignmentManager extends Z
regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
if (regionState != null) {
removeClosedRegion(regionState.getRegion());
- this.executorService.submit(new ClosedRegionHandler(server,
- this, regionState.getRegion()));
+ new ClosedRegionHandler(server, this, regionState.getRegion()).process();
+ closedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
}
break;
@@ -941,8 +951,7 @@ public class AssignmentManager extends Z
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the regionplan. (HBASE-5546)
getRegionPlan(regionState.getRegion(), sn, true);
- this.executorService.submit(new ClosedRegionHandler(server,
- this, regionState.getRegion()));
+ new ClosedRegionHandler(server, this, regionState.getRegion()).process();
}
}
break;
@@ -980,8 +989,9 @@ public class AssignmentManager extends Z
regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
if (regionState != null) {
failedOpenTracker.remove(encodedName); // reset the count, if any
- this.executorService.submit(new OpenedRegionHandler(
- server, this, regionState.getRegion(), sn, expectedVersion));
+ new OpenedRegionHandler(
+ server, this, regionState.getRegion(), sn, expectedVersion).process();
+ openedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
}
break;
@@ -993,6 +1003,32 @@ public class AssignmentManager extends Z
}
}
+ //For unit tests only
+ boolean wasClosedHandlerCalled(HRegionInfo hri) {
+ AtomicBoolean b = closedRegionHandlerCalled.get(hri);
+ //compareAndSet to be sure that unit tests don't see stale values. Means,
+ //we will return true exactly once unless the handler code resets to true
+ //this value.
+ return b == null ? false : b.compareAndSet(true, false);
+ }
+
+ //For unit tests only
+ boolean wasOpenedHandlerCalled(HRegionInfo hri) {
+ AtomicBoolean b = openedRegionHandlerCalled.get(hri);
+ //compareAndSet to be sure that unit tests don't see stale values. Means,
+ //we will return true exactly once unless the handler code resets to true
+ //this value.
+ return b == null ? false : b.compareAndSet(true, false);
+ }
+
+ //For unit tests only
+ boolean wasSplitHandlerCalled() {
+ //compareAndSet to be sure that unit tests don't see stale values. Means,
+ //we will return true exactly once unless the handler code resets to true
+ //this value.
+ return splitRegionHandlerCalled.compareAndSet(true, false);
+ }
+
/**
* @return Returns true if this RegionState is splittable; i.e. the
* RegionState is currently in splitting state or pending_close or
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java Mon Aug 5 23:10:37 2013
@@ -24,16 +24,11 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
-import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -86,35 +81,27 @@ public class TestMaster {
tableRegions.get(0).getFirst().getEndKey());
// Now trigger a split and stop when the split is in progress
- CountDownLatch split = new CountDownLatch(1);
- CountDownLatch proceed = new CountDownLatch(1);
- RegionSplitListener list = new RegionSplitListener(split, proceed);
- cluster.getMaster().executorService.
- registerListener(EventType.RS_ZK_REGION_SPLIT, list);
-
LOG.info("Splitting table");
TEST_UTIL.getHBaseAdmin().split(TABLENAME);
LOG.info("Waiting for split result to be about to open");
- split.await(60, TimeUnit.SECONDS);
- try {
- LOG.info("Making sure we can call getTableRegions while opening");
- tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
+ while (!m.assignmentManager.wasSplitHandlerCalled()) {
+ Thread.sleep(100);
+ }
+ LOG.info("Making sure we can call getTableRegions while opening");
+ tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
TABLENAME, false);
- LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
- // We have three regions because one is split-in-progress
- assertEquals(3, tableRegions.size());
- LOG.info("Making sure we can call getTableRegionClosest while opening");
- Pair<HRegionInfo, ServerName> pair =
+ LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
+ // We have three regions because one is split-in-progress
+ assertEquals(3, tableRegions.size());
+ LOG.info("Making sure we can call getTableRegionClosest while opening");
+ Pair<HRegionInfo, ServerName> pair =
m.getTableRegionForRow(TABLENAME, Bytes.toBytes("cde"));
- LOG.info("Result is: " + pair);
- Pair<HRegionInfo, ServerName> tableRegionFromName =
+ LOG.info("Result is: " + pair);
+ Pair<HRegionInfo, ServerName> tableRegionFromName =
MetaReader.getRegion(m.getCatalogTracker(),
pair.getFirst().getRegionName());
- assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
- } finally {
- proceed.countDown();
- }
+ assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
}
@Test
@@ -175,33 +162,5 @@ public class TestMaster {
TEST_UTIL.deleteTable(tableName);
}
}
-
- static class RegionSplitListener implements EventHandlerListener {
- CountDownLatch split, proceed;
-
- public RegionSplitListener(CountDownLatch split, CountDownLatch proceed) {
- this.split = split;
- this.proceed = proceed;
- }
-
- @Override
- public void afterProcess(EventHandler event) {
- if (event.getEventType() != EventType.RS_ZK_REGION_SPLIT) {
- return;
- }
- try {
- split.countDown();
- proceed.await(60, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- return;
- }
-
- @Override
- public void beforeProcess(EventHandler event) {
- }
- }
-
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java Mon Aug 5 23:10:37 2013
@@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,10 +42,6 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -116,29 +111,14 @@ public class TestZKBasedOpenCloseRegion
HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
- AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
- AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
-
- EventHandlerListener closeListener =
- new ReopenEventListener(hri.getRegionNameAsString(),
- closeEventProcessed, EventType.RS_ZK_REGION_CLOSED);
- cluster.getMaster().executorService.
- registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener);
-
- EventHandlerListener openListener =
- new ReopenEventListener(hri.getRegionNameAsString(),
- reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
- cluster.getMaster().executorService.
- registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
LOG.info("Unassign " + hri.getRegionNameAsString());
cluster.getMaster().assignmentManager.unassign(hri);
- while (!closeEventProcessed.get()) {
+ while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
Threads.sleep(100);
}
- while (!reopenEventProcessed.get()) {
+ while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
Threads.sleep(100);
}
@@ -157,83 +137,6 @@ public class TestZKBasedOpenCloseRegion
return hri;
}
- public static class ReopenEventListener implements EventHandlerListener {
- private static final Log LOG = LogFactory.getLog(ReopenEventListener.class);
- String regionName;
- AtomicBoolean eventProcessed;
- EventType eventType;
-
- public ReopenEventListener(String regionName,
- AtomicBoolean eventProcessed, EventType eventType) {
- this.regionName = regionName;
- this.eventProcessed = eventProcessed;
- this.eventType = eventType;
- }
-
- @Override
- public void beforeProcess(EventHandler event) {
- if(event.getEventType() == eventType) {
- LOG.info("Received " + eventType + " and beginning to process it");
- }
- }
-
- @Override
- public void afterProcess(EventHandler event) {
- LOG.info("afterProcess(" + event + ")");
- if(event.getEventType() == eventType) {
- LOG.info("Finished processing " + eventType);
- String regionName = "";
- if(eventType == EventType.RS_ZK_REGION_OPENED) {
- TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
- regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
- } else if(eventType == EventType.RS_ZK_REGION_CLOSED) {
- TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
- regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
- }
- if(this.regionName.equals(regionName)) {
- eventProcessed.set(true);
- }
- synchronized(eventProcessed) {
- eventProcessed.notifyAll();
- }
- }
- }
- }
-
- public static class CloseRegionEventListener implements EventHandlerListener {
- private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
- String regionToClose;
- AtomicBoolean closeEventProcessed;
-
- public CloseRegionEventListener(String regionToClose,
- AtomicBoolean closeEventProcessed) {
- this.regionToClose = regionToClose;
- this.closeEventProcessed = closeEventProcessed;
- }
-
- @Override
- public void afterProcess(EventHandler event) {
- LOG.info("afterProcess(" + event + ")");
- if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) {
- LOG.info("Finished processing CLOSE REGION");
- TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
- if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) {
- LOG.info("Setting closeEventProcessed flag");
- closeEventProcessed.set(true);
- } else {
- LOG.info("Region to close didn't match");
- }
- }
- }
-
- @Override
- public void beforeProcess(EventHandler event) {
- if(event.getEventType() == EventType.M_RS_CLOSE_REGION) {
- LOG.info("Received CLOSE RPC and beginning to process it");
- }
- }
- }
-
/**
* This test shows how a region won't be able to be assigned to a RS
* if it's already "processing" it.
@@ -253,13 +156,6 @@ public class TestZKBasedOpenCloseRegion
// fake that hr1 is processing the region
hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true);
- AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
- EventHandlerListener openListener =
- new ReopenEventListener(hri.getRegionNameAsString(),
- reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
- cluster.getMaster().executorService.
- registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
// now ask the master to move the region to hr1, will fail
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(hr1.getServerName().toString()));
@@ -269,22 +165,14 @@ public class TestZKBasedOpenCloseRegion
// remove the block and reset the boolean
hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
- reopenEventProcessed.set(false);
// now try moving a region when there is no region in transition.
hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr1));
- openListener =
- new ReopenEventListener(hri.getRegionNameAsString(),
- reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
-
- cluster.getMaster().executorService.
- registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(hr0.getServerName().toString()));
- while (!reopenEventProcessed.get()) {
+ while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
Threads.sleep(100);
}
@@ -304,15 +192,9 @@ public class TestZKBasedOpenCloseRegion
HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
- AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
- EventHandlerListener listener =
- new CloseRegionEventListener(hri.getRegionNameAsString(),
- closeEventProcessed);
- cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener);
-
cluster.getMaster().assignmentManager.unassign(hri);
- while (!closeEventProcessed.get()) {
+ while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
Threads.sleep(100);
}
LOG.info("Done with testCloseRegion");