You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2013/06/05 03:13:00 UTC

svn commit: r1489679 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/regionserver/

Author: jeffreyz
Date: Wed Jun  5 01:12:59 2013
New Revision: 1489679

URL: http://svn.apache.org/r1489679
Log:
HBASE-8680: distributedLogReplay performance regression

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1489679&r1=1489678&r2=1489679&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Wed Jun  5 01:12:59 2013
@@ -261,7 +261,9 @@ public class RegionStates {
     RegionState regionState = new RegionState(
       hri, state, System.currentTimeMillis(), newServerName);
     RegionState oldState = regionStates.put(regionName, regionState);
-    LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
+    if (oldState == null || oldState.getState() != regionState.getState()) {
+      LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
+    }
     if (state != State.SPLITTING && (newServerName != null
         || (state != State.PENDING_CLOSE && state != State.CLOSING))) {
       regionsInTransition.put(regionName, regionState);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1489679&r1=1489678&r2=1489679&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jun  5 01:12:59 2013
@@ -1566,7 +1566,14 @@ public class HRegionServer implements Cl
     this.rpcServer.start();
 
     // Create the log splitting worker and start it
-    this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
+    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
+    // quite a while inside HConnection layer. The worker won't be available for other
+    // tasks even after current task is preempted after a split task times out.
+    Configuration sinkConf = HBaseConfiguration.create(conf);
+    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
+    sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
+    this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
     splitLogWorker.start();
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1489679&r1=1489678&r2=1489679&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Jun  5 01:12:59 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
@@ -92,6 +93,7 @@ public class SplitLogWorker extends ZooK
   private boolean workerInGrabTask = false;
   private final int report_period;
   private RegionServerServices server = null;
+  private Configuration conf = null;
 
   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
       RegionServerServices server, TaskExecutor splitTaskExecutor) {
@@ -101,6 +103,7 @@ public class SplitLogWorker extends ZooK
     this.splitTaskExecutor = splitTaskExecutor;
     report_period = conf.getInt("hbase.splitlog.report.period",
       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
+    this.conf = conf;
   }
 
   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
@@ -110,6 +113,7 @@ public class SplitLogWorker extends ZooK
     this.splitTaskExecutor = splitTaskExecutor;
     report_period = conf.getInt("hbase.splitlog.report.period",
       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
+    this.conf = conf;
   }
 
   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
@@ -165,6 +169,8 @@ public class SplitLogWorker extends ZooK
     try {
       LOG.info("SplitLogWorker " + this.serverName + " starting");
       this.watcher.registerListener(this);
+      // initialize a new connection for splitlogworker configuration
+      HConnectionManager.getConnection(conf);
       int res;
       // wait for master to create the splitLogZnode
       res = -1;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1489679&r1=1489678&r2=1489679&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Jun  5 01:12:59 2013
@@ -1650,18 +1650,8 @@ public class HLogSplitter {
     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
     private boolean hasEditsInDisablingOrDisabledTables = false;
 
-    private Configuration sinkConf;
     public LogReplayOutputSink(int numWriters) {
       super(numWriters);
-      // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
-      // quite a while inside HConnection layer. The worker won't available for other
-      // tasks even after current task is preempted after a split task times out.
-      sinkConf = HBaseConfiguration.create(conf);
-      sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
-      sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2);
-      sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
-
       this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", 
         SplitLogManager.DEFAULT_TIMEOUT);
       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
@@ -1809,9 +1799,6 @@ public class HLogSplitter {
                   // skip current kv if column family doesn't exist anymore or already flushed
                   continue;
                 }
-              } else {
-                LOG.warn("Can't find store max sequence ids map for region:"
-                    + loc.getRegionInfo().getEncodedName());
               }
             }
 
@@ -1861,21 +1848,29 @@ public class HLogSplitter {
      */
     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
         byte[] table, byte[] row, String originalEncodedRegionName) throws IOException {
+
+      // fetch location from cache
       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
       if(loc != null) return loc;
-      
+      // fetch location from .META.
       loc = hconn.getRegionLocation(table, row, false);
       if (loc == null) {
         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
             + " of table:" + Bytes.toString(table));
       }
+      // check if current row moves to a different region due to region merge/split
+      if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
+        // originalEncodedRegionName should have already flushed
+        lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
+        HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
+        if (tmpLoc != null) return tmpLoc;
+      }
 
       Long lastFlushedSequenceId = -1l;
       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
       Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
           .getEncodedName());
 
-      onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
       // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
       // update the value for the region
       RegionStoreSequenceIds ids =
@@ -1901,7 +1896,8 @@ public class HLogSplitter {
         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
             + " because it's not in recovering.");
       }
-      
+
+      onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
       return loc;
     }
 
@@ -2068,6 +2064,7 @@ public class HLogSplitter {
             for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
               try {
+                hconn.clearRegionCache();
                 hconn.close();
               } catch (IOException ioe) {
                 result.add(ioe);
@@ -2128,7 +2125,7 @@ public class HLogSplitter {
         synchronized (this.tableNameToHConnectionMap) {
           hconn = this.tableNameToHConnectionMap.get(tableName);
           if (hconn == null) {
-            hconn = HConnectionManager.createConnection(sinkConf);
+            hconn = HConnectionManager.getConnection(conf);
             this.tableNameToHConnectionMap.put(tableName, hconn);
           }
         }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1489679&r1=1489678&r2=1489679&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Wed Jun  5 01:12:59 2013
@@ -200,14 +200,14 @@ public class TestSplitLogWorker {
     slw.start();
     try {
       Thread.yield(); // let the worker start
-      Thread.sleep(100);
+      Thread.sleep(1000);
 
       // this time create a task node after starting the splitLogWorker
       zkw.getRecoverableZooKeeper().create(PATH,
         new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
+      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000);
       assertEquals(1, slw.taskReadySeq);
       byte [] bytes = ZKUtil.getData(zkw, PATH);
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);