You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/12 16:48:00 UTC
svn commit: r1349377 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/protobuf/
main/java/org/apache/hadoop/hbase/regionserver/ test/java/o...
Author: tedyu
Date: Tue Jun 12 14:47:59 2012
New Revision: 1349377
URL: http://svn.apache.org/viewvc?rev=1349377&view=rev
Log:
HBASE-6012 Handling RegionOpeningState for bulk assign (Chunhui)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Tue Jun 12 14:47:59 2012
@@ -1475,7 +1475,20 @@ public class AssignmentManager extends Z
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
while (!this.master.isStopped()) {
try {
- this.serverManager.sendRegionOpen(destination, regions);
+ List<RegionOpeningState> regionOpeningStateList = this.serverManager
+ .sendRegionOpen(destination, regions);
+ if (regionOpeningStateList == null) {
+ // Failed getting RPC connection to this server
+ return false;
+ }
+ for (int i = 0; i < regionOpeningStateList.size(); i++) {
+ if (regionOpeningStateList.get(i) == RegionOpeningState.ALREADY_OPENED) {
+ processAlreadyOpenedRegion(regions.get(i), destination);
+ } else if (regionOpeningStateList.get(i) == RegionOpeningState.FAILED_OPENING) {
+ // Failed opening this region, reassign it
+ assign(regions.get(i), true, true);
+ }
+ }
break;
} catch (RemoteException e) {
IOException decodedException = e.unwrapRemoteException();
@@ -1534,6 +1547,9 @@ public class AssignmentManager extends Z
failedPlans.put(e.getKey(), e.getValue());
}
} catch (Throwable t) {
+ LOG.warn("Failed bulking assigning " + e.getValue().size()
+ + " region(s) to " + e.getKey().getServerName()
+ + ", and continue to bulk assign others", t);
failedPlans.put(e.getKey(), e.getValue());
}
}
@@ -1545,7 +1561,9 @@ public class AssignmentManager extends Z
+ " regions to server " + e.getKey() + ", reassigning them");
reassigningRegions.addAll(e.getValue());
}
- assign(reassigningRegions, servers);
+ for (HRegionInfo region : reassigningRegions) {
+ assign(region, true, true);
+ }
}
}
@@ -1723,31 +1741,10 @@ public class AssignmentManager extends Z
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
.getDestination(), state.getRegion(), versionOfOfflineNode);
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
- // Remove region from in-memory transition and unassigned node from ZK
- // While trying to enable the table the regions of the table were
- // already enabled.
- LOG.debug("ALREADY_OPENED region " + state.getRegion().getRegionNameAsString() +
- " to " + plan.getDestination().toString());
- String encodedRegionName = state.getRegion()
- .getEncodedName();
- try {
- ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
- } catch (KeeperException.NoNodeException e) {
- if(LOG.isDebugEnabled()){
- LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist.");
- }
- } catch (KeeperException e) {
- master.abort(
- "Error deleting OFFLINED node in ZK for transition ZK node ("
- + encodedRegionName + ")", e);
- }
- // no lock concurrent ok -> sequentially consistent
- this.regionsInTransition.remove(plan.getRegionInfo().getEncodedName());
-
- synchronized (this.regions) {
- this.regions.put(plan.getRegionInfo(), plan.getDestination());
- addToServers(plan.getDestination(), plan.getRegionInfo());
- }
+ processAlreadyOpenedRegion(state.getRegion(), plan.getDestination());
+ } else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+ // Failed opening this region
+ throw new Exception("Get regionOpeningState=" + regionOpenState);
}
break;
} catch (Throwable t) {
@@ -1779,6 +1776,36 @@ public class AssignmentManager extends Z
}
}
+ private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
+
+ // Remove region from in-memory transition and unassigned node from ZK
+ // While trying to enable the table the regions of the table were
+ // already enabled.
+ LOG.debug("ALREADY_OPENED region " + region.getRegionNameAsString()
+ + " to " + sn);
+ String encodedRegionName = region.getEncodedName();
+ try {
+ ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
+ } catch (KeeperException.NoNodeException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The unassigned node " + encodedRegionName
+ + " doesnot exist.");
+ }
+ } catch (KeeperException e) {
+ master.abort(
+ "Error deleting OFFLINED node in ZK for transition ZK node ("
+ + encodedRegionName + ")", e);
+ }
+ // no lock concurrent ok -> sequentially consistent
+ this.regionsInTransition.remove(region.getEncodedName());
+
+ synchronized (this.regions) {
+ this.regions.put(region, sn);
+ addToServers(sn, region);
+ }
+
+ }
+
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
String tableName = region.getTableNameAsString();
boolean disabled = this.zkTable.isDisabledTable(tableName);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Jun 12 14:47:59 2012
@@ -511,16 +511,20 @@ public class ServerManager {
* <p>
* @param server server to open a region
* @param regions regions to open
+ * @return a list of region opening states
*/
- public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
+ public List<RegionOpeningState> sendRegionOpen(ServerName server,
+ List<HRegionInfo> regions)
throws IOException {
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
- return;
+ return null;
}
- ProtobufUtil.openRegion(admin, regions);
+
+ OpenRegionResponse response = ProtobufUtil.openRegion(admin, regions);
+ return ResponseConverter.getRegionOpeningStateList(response);
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Tue Jun 12 14:47:59 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.De
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
/**
@@ -305,6 +306,16 @@ public class ServerShutdownHandler exten
+ " because it has been opened in "
+ addressFromAM.getServerName());
} else {
+ if (rit != null) {
+ //clean zk node
+ try{
+ LOG.info("Reassigning region with rs =" + rit + " and deleting zk node if exists");
+ ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), e.getKey());
+ }catch (KeeperException ke) {
+ this.server.abort("Unexpected ZK exception deleting unassigned node " + e.getKey(), ke);
+ return;
+ }
+ }
toAssignRegions.add(e.getKey());
}
} else if (rit != null && (rit.isSplitting() || rit.isSplit())) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Jun 12 14:47:59 2012
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
@@ -1286,7 +1287,6 @@ public final class ProtobufUtil {
/**
* A helper to open a region using admin protocol.
- *
* @param admin
* @param region
* @throws IOException
@@ -1304,17 +1304,18 @@ public final class ProtobufUtil {
/**
* A helper to open a list of regions using admin protocol.
- *
+ *
* @param admin
* @param regions
+ * @return OpenRegionResponse
* @throws IOException
*/
- public static void openRegion(final AdminProtocol admin,
+ public static OpenRegionResponse openRegion(final AdminProtocol admin,
final List<HRegionInfo> regions) throws IOException {
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(regions);
try {
- admin.openRegion(null, request);
+ return admin.openRegion(null, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Tue Jun 12 14:47:59 2012
@@ -157,6 +157,23 @@ public final class ResponseConverter {
}
/**
+ * Get a list of region opening state from a OpenRegionResponse
+ *
+ * @param proto the OpenRegionResponse
+ * @return the list of region opening state
+ */
+ public static List<RegionOpeningState> getRegionOpeningStateList(
+ final OpenRegionResponse proto) {
+ if (proto == null) return null;
+ List<RegionOpeningState> regionOpeningStates = new ArrayList<RegionOpeningState>();
+ for (int i = 0; i < proto.getOpeningStateCount(); i++) {
+ regionOpeningStates.add(RegionOpeningState.valueOf(
+ proto.getOpeningState(i).name()));
+ }
+ return regionOpeningStates;
+ }
+
+ /**
* Check if the region is closed from a CloseRegionResponse
*
* @param proto the CloseRegionResponse
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jun 12 14:47:59 2012
@@ -3363,56 +3363,74 @@ public class HRegionServer implements C
}
try {
checkOpen();
- requestCount.incrementAndGet();
- OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
- Map<String, HTableDescriptor> htds =
- new HashMap<String, HTableDescriptor>(request.getRegionList().size());
- for (RegionInfo regionInfo: request.getRegionList()) {
- HRegionInfo region = HRegionInfo.convert(regionInfo);
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ requestCount.incrementAndGet();
+ OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
+ Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(
+ request.getRegionList().size());
+ boolean isBulkAssign = request.getRegionList().size() > 1;
+ for (RegionInfo regionInfo : request.getRegionList()) {
+ HRegionInfo region = HRegionInfo.convert(regionInfo);
+ try {
checkIfRegionInTransition(region, OPEN);
-
HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
if (null != onlineRegion) {
- // See HBASE-5094. Cross check with META if still this RS is owning the
- // region.
+ // See HBASE-5094. Cross check with META if still this RS is owning
+ // the region.
Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
- this.catalogTracker, region.getRegionName());
+ this.catalogTracker, region.getRegionName());
if (this.getServerName().equals(p.getSecond())) {
LOG.warn("Attempted open of " + region.getEncodedName()
- + " but already online on this server");
+ + " but already online on this server");
builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
continue;
} else {
LOG.warn("The region " + region.getEncodedName()
- + " is online on this server but META does not have this server.");
+ + " is online on this server but META does not have this server.");
removeFromOnlineRegions(region.getEncodedName(), null);
}
}
- LOG.info("Received request to open region: "
- + region.getRegionNameAsString() + " on "+this.serverNameFromMasterPOV);
+ LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
+ + this.serverNameFromMasterPOV);
HTableDescriptor htd = htds.get(region.getTableNameAsString());
if (htd == null) {
htd = this.tableDescriptors.get(region.getTableName());
htds.put(region.getTableNameAsString(), htd);
}
- this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true);
+ this.regionsInTransitionInRS.putIfAbsent(
+ region.getEncodedNameAsBytes(), true);
// Need to pass the expected version in the constructor.
if (region.isRootRegion()) {
this.service.submit(new OpenRootHandler(this, this, region, htd,
- versionOfOfflineNode));
+ versionOfOfflineNode));
} else if (region.isMetaRegion()) {
this.service.submit(new OpenMetaHandler(this, this, region, htd,
- versionOfOfflineNode));
+ versionOfOfflineNode));
} else {
this.service.submit(new OpenRegionHandler(this, this, region, htd,
- versionOfOfflineNode));
+ versionOfOfflineNode));
}
builder.addOpeningState(RegionOpeningState.OPENED);
+ } catch (RegionAlreadyInTransitionException rie) {
+ LOG.warn("Region is already in transition", rie);
+ if (isBulkAssign) {
+ builder.addOpeningState(RegionOpeningState.OPENED);
+ } else {
+ throw new ServiceException(rie);
+ }
+ } catch (IOException ie) {
+ LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
+ if (isBulkAssign) {
+ builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
+ } else {
+ throw new ServiceException(ie);
+ }
}
- return builder.build();
- } catch (IOException ie) {
- throw new ServiceException(ie);
}
+ return builder.build();
+
}
/**
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java Tue Jun 12 14:47:59 2012
@@ -580,6 +580,7 @@ public class TestAssignmentManager {
MasterServices services = Mockito.mock(MasterServices.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am);
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
+ Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false);
handler.process();