You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/19 05:59:44 UTC

svn commit: r1159498 - in /hbase/branches/0.90: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Author: stack
Date: Fri Aug 19 03:59:44 2011
New Revision: 1159498

URL: http://svn.apache.org/viewvc?rev=1159498&view=rev
Log:
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll is blocked

Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Fri Aug 19 03:59:44 2011
@@ -14,6 +14,8 @@ Release 0.90.5 - Unreleased
                Campbell)
    HBASE-4161  Incorrect use of listStatus() in HBase region initialization
                (Pritam Damania)
+   HBASE-4095  Hlog may not be rolled in a long time if checkLowReplication's
+               request of LogRoll is blocked (Jieshan Bean)
 
   IMPROVEMENT
    HBASE-4205  Enhance HTable javadoc (Eric Charles)

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 19 03:59:44 2011
@@ -29,11 +29,11 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Arrays;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Matcher;
@@ -129,7 +128,7 @@ public class HLog implements Syncable {
   private final long blocksize;
   private final String prefix;
   private final Path oldLogDir;
-  private boolean logRollRequested;
+  private boolean logRollRunning;
 
 
   private static Class<? extends Writer> logWriterClass;
@@ -140,7 +139,10 @@ public class HLog implements Syncable {
   }
 
   private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
-  private int initialReplication;    // initial replication factor of SequenceFile.writer
+  
+  // Minimum tolerable replicas, if the actual value is lower than it, 
+  // rollWriter will be triggered
+  private int minTolerableReplication; 
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
@@ -189,9 +191,19 @@ public class HLog implements Syncable {
   // The timestamp (in ms) when the log file was created.
   private volatile long filenum = -1;
 
-  //number of transactions in the current Hlog.
+  // number of transactions in the current Hlog.
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
+  // If live datanode count is lower than the default replicas value,
+  // RollWriter will be triggered in each sync(So the RollWriter will be
+  // triggered one by one in a short time). Using it as a workaround to slow
+  // down the roll frequency triggered by checkLowReplication().
+  private volatile int consecutiveLogRolls = 0;
+  private final int lowReplicationRollLimit;
+  // If consecutiveLogRolls is larger than lowReplicationRollLimit,
+  // then disable the rolling in checkLowReplication().
+  // Enable it if the replications recover.
+  private volatile boolean lowReplicationRollEnabled = true;
   // If > than this size, roll the log. This is typically 0.95 times the size
   // of the default Hdfs block size.
   private final long logrollsize;
@@ -358,6 +370,11 @@ public class HLog implements Syncable {
       }
     }
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.minTolerableReplication = conf.getInt(
+        "hbase.regionserver.hlog.tolerable.lowreplication",
+        this.fs.getDefaultReplication());
+    this.lowReplicationRollLimit = conf.getInt(
+        "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     LOG.info("HLog configuration: blocksize=" +
       StringUtils.byteDesc(this.blocksize) +
@@ -470,6 +487,7 @@ public class HLog implements Syncable {
     }
     byte [][] regionsToFlush = null;
     this.cacheFlushLock.lock();
+    this.logRollRunning = true;
     try {
       if (closed) {
         return regionsToFlush;
@@ -480,7 +498,6 @@ public class HLog implements Syncable {
       this.filenum = System.currentTimeMillis();
       Path newPath = computeFilename();
       HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-      int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
       // Can we get at the dfsclient outputstream?  If an instance of
       // SFLW, it'll have done the necessary reflection to get at the
       // protected field name.
@@ -500,7 +517,6 @@ public class HLog implements Syncable {
         // Clean up current writer.
         Path oldFile = cleanupCurrentWriter(currentFilenum);
         this.writer = nextWriter;
-        this.initialReplication = nextInitialReplication;
         this.hdfs_out = nextHdfsOut;
 
         LOG.info((oldFile != null?
@@ -510,7 +526,6 @@ public class HLog implements Syncable {
             this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
           "New hlog " + FSUtils.getPath(newPath));
         this.numEntries.set(0);
-        this.logRollRequested = false;
       }
       // Can we delete any of the old log files?
       if (this.outputfiles.size() > 0) {
@@ -528,6 +543,7 @@ public class HLog implements Syncable {
         }
       }
     } finally {
+      this.logRollRunning = false;
       this.cacheFlushLock.unlock();
     }
     return regionsToFlush;
@@ -982,7 +998,7 @@ public class HLog implements Syncable {
       synchronized (this.updateLock) {
         syncTime += System.currentTimeMillis() - now;
         syncOps++;
-        if (!logRollRequested) {
+        if (!logRollRunning) {
           checkLowReplication();
           if (this.writer.getLength() > this.logrollsize) {
             requestLogRoll();
@@ -998,18 +1014,43 @@ public class HLog implements Syncable {
   }
 
   private void checkLowReplication() {
-    // if the number of replicas in HDFS has fallen below the initial
+    // if the number of replicas in HDFS has fallen below the configured
     // value, then roll logs.
     try {
       int numCurrentReplicas = getLogReplication();
-      if (numCurrentReplicas != 0 &&
-          numCurrentReplicas < this.initialReplication) {
-        LOG.warn("HDFS pipeline error detected. " +
-            "Found " + numCurrentReplicas + " replicas but expecting " +
-            this.initialReplication + " replicas. " +
-            " Requesting close of hlog.");
-        requestLogRoll();
-        logRollRequested = true;
+      if (numCurrentReplicas != 0
+          && numCurrentReplicas < this.minTolerableReplication) {
+        if (this.lowReplicationRollEnabled) {
+          if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
+            LOG.warn("HDFS pipeline error detected. " + "Found "
+                + numCurrentReplicas + " replicas but expecting no less than "
+                + this.minTolerableReplication + " replicas. "
+                + " Requesting close of hlog.");
+            requestLogRoll();
+            // If rollWriter is requested, increase consecutiveLogRolls. Once it
+            // is larger than lowReplicationRollLimit, disable the
+            // LowReplication-Roller
+            this.consecutiveLogRolls++;
+          } else {
+            LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
+                + "the total number of live datanodes is lower than the tolerable replicas.");
+            this.consecutiveLogRolls = 0;
+            this.lowReplicationRollEnabled = false;
+          }
+        }
+      } else if (numCurrentReplicas >= this.minTolerableReplication) {
+        if (!this.lowReplicationRollEnabled) {
+          // The new writer's log replicas is always the default value.
+          // So we should not enable LowReplication-Roller. If numEntries
+          // is lower than or equals 1, we consider it as a new writer.
+          if (this.numEntries.get() <= 1) {
+            return;
+          }
+          // Once the live datanode number and the replicas return to normal,
+          // enable the LowReplication-Roller.
+          this.lowReplicationRollEnabled = true;
+          LOG.info("LowReplication-Roller was enabled.");
+        }
       }
     } catch (Exception e) {
       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
@@ -1302,6 +1343,15 @@ public class HLog implements Syncable {
   }
 
   /**
+   * Get LowReplication-Roller status
+   * 
+   * @return lowReplicationRollEnabled
+   */
+  public boolean isLowReplicationRollEnabled() {
+    return lowReplicationRollEnabled;
+  }
+
+  /**
    * Get the directory we are making logs in.
    * 
    * @return dir

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Aug 19 03:59:44 2011
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -29,23 +31,21 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -55,8 +55,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test log deletion as logs are rolled.
  */
@@ -137,6 +135,10 @@ public class TestLogRolling  {
    // the namenode might still try to choose the recently-dead datanode
    // for a pipeline, so try to a new pipeline multiple times
     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
+    TEST_UTIL.getConfiguration().setInt(
+        "hbase.regionserver.hlog.tolerable.lowreplication", 2);
+    TEST_UTIL.getConfiguration().setInt(
+        "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
     TEST_UTIL.startMiniCluster(2);
 
     cluster = TEST_UTIL.getHBaseCluster();
@@ -224,6 +226,30 @@ public class TestLogRolling  {
       // continue
     }
   }
+  
+  void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
+      throws IOException {
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("row"
+          + String.format("%1$04d", (start + i))));
+      put.add(HConstants.CATALOG_FAMILY, null, value);
+      table.put(put);
+    }
+    long startTime = System.currentTimeMillis();
+    long remaining = timeout;
+    while (remaining > 0) {
+      if (log.isLowReplicationRollEnabled() == expect) {
+        break;
+      } else {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          // continue
+        }
+        remaining = timeout - (System.currentTimeMillis() - startTime);
+      }
+    }
+  }
 
   /**
    * Give me the HDFS pipeline for this log file
@@ -267,7 +293,7 @@ public class TestLogRolling  {
 
     this.server = cluster.getRegionServer(0);
     this.log = server.getWAL();
-    
+
     // Create the test table and open it
     String tableName = getName();
     HTableDescriptor desc = new HTableDescriptor(tableName);
@@ -320,7 +346,23 @@ public class TestLogRolling  {
     // write some more log data (this should use a new hdfs_out)
     writeData(table, 3);
     assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
-    assertTrue("New log file should have the default replication", log
-        .getLogReplication() == fs.getDefaultReplication());
+    // kill another datanode in the pipeline, so the replicas will be lower than
+    // the configured value 2.
+    assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
+    Thread.sleep(10000);
+    batchWriteAndWait(table, 4, false, 10000);
+    assertTrue("LowReplication Roller should've been disabled",
+        !log.isLowReplicationRollEnabled());
+    dfsCluster
+        .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
+    dfsCluster.waitActive();
+    // Force roll writer. The new log file will have the defalt replication,
+    // and the lowReplication roller will be enabled.
+    log.rollWriter();
+    batchWriteAndWait(table, 14, true, 10000);
+    assertTrue("LowReplication Roller should've been enabled",
+        log.isLowReplicationRollEnabled());
+    assertTrue("New log file should have the default replication",
+        log.getLogReplication() == fs.getDefaultReplication());
   }
 }