You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/06/12 16:48:00 UTC

svn commit: r1349377 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/o...

Author: tedyu
Date: Tue Jun 12 14:47:59 2012
New Revision: 1349377

URL: http://svn.apache.org/viewvc?rev=1349377&view=rev
Log:
HBASE-6012 Handling RegionOpeningState for bulk assign (Chunhui)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Tue Jun 12 14:47:59 2012
@@ -1475,7 +1475,20 @@ public class AssignmentManager extends Z
           getLong("hbase.regionserver.rpc.startup.waittime", 60000);
       while (!this.master.isStopped()) {
         try {
-          this.serverManager.sendRegionOpen(destination, regions);
+          List<RegionOpeningState> regionOpeningStateList = this.serverManager
+              .sendRegionOpen(destination, regions);
+          if (regionOpeningStateList == null) {
+            // Failed getting RPC connection to this server
+            return false;
+          }
+          for (int i = 0; i < regionOpeningStateList.size(); i++) {
+            if (regionOpeningStateList.get(i) == RegionOpeningState.ALREADY_OPENED) {
+              processAlreadyOpenedRegion(regions.get(i), destination);
+            } else if (regionOpeningStateList.get(i) == RegionOpeningState.FAILED_OPENING) {
+              // Failed opening this region, reassign it
+              assign(regions.get(i), true, true);
+            }
+          }
           break;
         } catch (RemoteException e) {
           IOException decodedException = e.unwrapRemoteException();
@@ -1534,6 +1547,9 @@ public class AssignmentManager extends Z
           failedPlans.put(e.getKey(), e.getValue());
         }
       } catch (Throwable t) {
+        LOG.warn("Failed bulking assigning " + e.getValue().size()
+            + " region(s) to " + e.getKey().getServerName()
+            + ", and continue to bulk assign others", t);
         failedPlans.put(e.getKey(), e.getValue());
       }
     }
@@ -1545,7 +1561,9 @@ public class AssignmentManager extends Z
             + " regions to server " + e.getKey() + ", reassigning them");
         reassigningRegions.addAll(e.getValue());
       }
-      assign(reassigningRegions, servers);
+      for (HRegionInfo region : reassigningRegions) {
+        assign(region, true, true);
+      }
     }
   }
 
@@ -1723,31 +1741,10 @@ public class AssignmentManager extends Z
         RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
             .getDestination(), state.getRegion(), versionOfOfflineNode);
         if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
-          // Remove region from in-memory transition and unassigned node from ZK
-          // While trying to enable the table the regions of the table were
-          // already enabled.
-          LOG.debug("ALREADY_OPENED region " + state.getRegion().getRegionNameAsString() +
-              " to " + plan.getDestination().toString());
-          String encodedRegionName = state.getRegion()
-              .getEncodedName();
-          try {
-            ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
-          } catch (KeeperException.NoNodeException e) {
-            if(LOG.isDebugEnabled()){
-              LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist.");
-            }
-          } catch (KeeperException e) {
-            master.abort(
-                "Error deleting OFFLINED node in ZK for transition ZK node ("
-                    + encodedRegionName + ")", e);
-          }
-          // no lock concurrent ok -> sequentially consistent
-          this.regionsInTransition.remove(plan.getRegionInfo().getEncodedName());
-
-          synchronized (this.regions) {
-            this.regions.put(plan.getRegionInfo(), plan.getDestination());
-            addToServers(plan.getDestination(), plan.getRegionInfo());
-          }
+          processAlreadyOpenedRegion(state.getRegion(), plan.getDestination());
+        } else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+          // Failed opening this region
+          throw new Exception("Get regionOpeningState=" + regionOpenState);
         }
         break;
       } catch (Throwable t) {
@@ -1779,6 +1776,36 @@ public class AssignmentManager extends Z
     }
   }
 
+  private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
+
+    // Remove region from in-memory transition and unassigned node from ZK
+    // While trying to enable the table the regions of the table were
+    // already enabled.
+    LOG.debug("ALREADY_OPENED region " + region.getRegionNameAsString()
+        + " to " + sn);
+    String encodedRegionName = region.getEncodedName();
+    try {
+      ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
+    } catch (KeeperException.NoNodeException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The unassigned node " + encodedRegionName
+            + " doesnot exist.");
+      }
+    } catch (KeeperException e) {
+      master.abort(
+          "Error deleting OFFLINED node in ZK for transition ZK node ("
+              + encodedRegionName + ")", e);
+    }
+    // no lock concurrent ok -> sequentially consistent
+    this.regionsInTransition.remove(region.getEncodedName());
+
+    synchronized (this.regions) {
+      this.regions.put(region, sn);
+      addToServers(sn, region);
+    }
+
+  }
+
   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
     String tableName = region.getTableNameAsString();
     boolean disabled = this.zkTable.isDisabledTable(tableName);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Jun 12 14:47:59 2012
@@ -511,16 +511,20 @@ public class ServerManager {
    * <p>
    * @param server server to open a region
    * @param regions regions to open
+   * @return a list of region opening states
    */
-  public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
+  public List<RegionOpeningState> sendRegionOpen(ServerName server,
+      List<HRegionInfo> regions)
   throws IOException {
     AdminProtocol admin = getServerConnection(server);
     if (admin == null) {
       LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
         " failed because no RPC connection found to this server");
-      return;
+      return null;
     }
-    ProtobufUtil.openRegion(admin, regions);
+
+    OpenRegionResponse response = ProtobufUtil.openRegion(admin, regions);
+    return ResponseConverter.getRegionOpeningStateList(response);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Tue Jun 12 14:47:59 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.De
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -305,6 +306,16 @@ public class ServerShutdownHandler exten
                     + " because it has been opened in "
                     + addressFromAM.getServerName());
               } else {
+                if (rit != null) {
+                  //clean zk node
+                  try{
+                    LOG.info("Reassigning region with rs =" + rit + " and deleting zk node if exists");
+                    ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), e.getKey());
+                  }catch (KeeperException ke) {
+                    this.server.abort("Unexpected ZK exception deleting unassigned node " + e.getKey(), ke);
+                    return;
+                  }
+                }
                 toAssignRegions.add(e.getKey());
               }
           } else if (rit != null && (rit.isSplitting() || rit.isSplit())) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Jun 12 14:47:59 2012
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
@@ -1286,7 +1287,6 @@ public final class ProtobufUtil {
 
   /**
    * A helper to open a region using admin protocol.
-   *
    * @param admin
    * @param region
    * @throws IOException
@@ -1304,17 +1304,18 @@ public final class ProtobufUtil {
 
   /**
    * A helper to open a list of regions using admin protocol.
-   *
+   * 
    * @param admin
    * @param regions
+   * @return OpenRegionResponse
    * @throws IOException
    */
-  public static void openRegion(final AdminProtocol admin,
+  public static OpenRegionResponse openRegion(final AdminProtocol admin,
       final List<HRegionInfo> regions) throws IOException {
     OpenRegionRequest request =
       RequestConverter.buildOpenRegionRequest(regions);
     try {
-      admin.openRegion(null, request);
+      return admin.openRegion(null, request);
     } catch (ServiceException se) {
       throw getRemoteException(se);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Tue Jun 12 14:47:59 2012
@@ -157,6 +157,23 @@ public final class ResponseConverter {
   }
 
   /**
+   * Get a list of region opening state from a OpenRegionResponse
+   * 
+   * @param proto the OpenRegionResponse
+   * @return the list of region opening state
+   */
+  public static List<RegionOpeningState> getRegionOpeningStateList(
+      final OpenRegionResponse proto) {
+    if (proto == null) return null;
+    List<RegionOpeningState> regionOpeningStates = new ArrayList<RegionOpeningState>();
+    for (int i = 0; i < proto.getOpeningStateCount(); i++) {
+      regionOpeningStates.add(RegionOpeningState.valueOf(
+          proto.getOpeningState(i).name()));
+    }
+    return regionOpeningStates;
+  }
+
+  /**
    * Check if the region is closed from a CloseRegionResponse
    *
    * @param proto the CloseRegionResponse

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jun 12 14:47:59 2012
@@ -3363,56 +3363,74 @@ public class  HRegionServer implements C
     }
     try {
       checkOpen();
-      requestCount.incrementAndGet();
-      OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
-      Map<String, HTableDescriptor> htds =
-        new HashMap<String, HTableDescriptor>(request.getRegionList().size());
-      for (RegionInfo regionInfo: request.getRegionList()) {
-        HRegionInfo region = HRegionInfo.convert(regionInfo);
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+    requestCount.incrementAndGet();
+    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
+    Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(
+        request.getRegionList().size());
+    boolean isBulkAssign = request.getRegionList().size() > 1;
+    for (RegionInfo regionInfo : request.getRegionList()) {
+      HRegionInfo region = HRegionInfo.convert(regionInfo);
+      try {
         checkIfRegionInTransition(region, OPEN);
-
         HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
         if (null != onlineRegion) {
-          // See HBASE-5094. Cross check with META if still this RS is owning the
-          // region.
+          // See HBASE-5094. Cross check with META if still this RS is owning
+          // the region.
           Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
-            this.catalogTracker, region.getRegionName());
+              this.catalogTracker, region.getRegionName());
           if (this.getServerName().equals(p.getSecond())) {
             LOG.warn("Attempted open of " + region.getEncodedName()
-              + " but already online on this server");
+                + " but already online on this server");
             builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
             continue;
           } else {
             LOG.warn("The region " + region.getEncodedName()
-              + " is online on this server but META does not have this server.");
+                + " is online on this server but META does not have this server.");
             removeFromOnlineRegions(region.getEncodedName(), null);
           }
         }
-        LOG.info("Received request to open region: "
-          + region.getRegionNameAsString() + " on "+this.serverNameFromMasterPOV);
+        LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
+            + this.serverNameFromMasterPOV);
         HTableDescriptor htd = htds.get(region.getTableNameAsString());
         if (htd == null) {
           htd = this.tableDescriptors.get(region.getTableName());
           htds.put(region.getTableNameAsString(), htd);
         }
-        this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true);
+        this.regionsInTransitionInRS.putIfAbsent(
+            region.getEncodedNameAsBytes(), true);
         // Need to pass the expected version in the constructor.
         if (region.isRootRegion()) {
           this.service.submit(new OpenRootHandler(this, this, region, htd,
-            versionOfOfflineNode));
+              versionOfOfflineNode));
         } else if (region.isMetaRegion()) {
           this.service.submit(new OpenMetaHandler(this, this, region, htd,
-            versionOfOfflineNode));
+              versionOfOfflineNode));
         } else {
           this.service.submit(new OpenRegionHandler(this, this, region, htd,
-            versionOfOfflineNode));
+              versionOfOfflineNode));
         }
         builder.addOpeningState(RegionOpeningState.OPENED);
+      } catch (RegionAlreadyInTransitionException rie) {
+        LOG.warn("Region is already in transition", rie);
+        if (isBulkAssign) {
+          builder.addOpeningState(RegionOpeningState.OPENED);
+        } else {
+          throw new ServiceException(rie);
+        }
+      } catch (IOException ie) {
+        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
+        if (isBulkAssign) {
+          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
+        } else {
+          throw new ServiceException(ie);
+        }
       }
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
     }
+    return builder.build();
+
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java?rev=1349377&r1=1349376&r2=1349377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java Tue Jun 12 14:47:59 2012
@@ -580,6 +580,7 @@ public class TestAssignmentManager {
     MasterServices services = Mockito.mock(MasterServices.class);
     Mockito.when(services.getAssignmentManager()).thenReturn(am);
     Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
+    Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
     ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
       services, deadServers, SERVERNAME_A, false);
     handler.process();