You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:07:51 UTC

[15/50] [abbrv] git commit: [HBASE-11283] Roll HLog if sync time is anomalous

[HBASE-11283] Roll HLog if sync time is anomalous

Summary:
When a WAL gets slow this will schedule a roll.
This diff will also rate limit the forced rolls.

Test Plan: Added testSlowRoll to TestHLog

Reviewers: leon, elenapr, aaiyer, manukranthk

Reviewed By: manukranthk

Subscribers: daviddeng, aleksandr, hbase-prodeng@, hbase-eng@

Differential Revision: https://phabricator.fb.com/D1359586

Tasks: 4422839

git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@42799 e7acf4d4-3532-417f-9e73-7a9ae25a1f51


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ae31cc53
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ae31cc53
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ae31cc53

Branch: refs/heads/0.89-fb
Commit: ae31cc53050bdf656bf16f094c0b066eb5a0fc0e
Parents: b848685
Author: elliott <el...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Wed Jun 11 17:18:51 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:44:22 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/LogRoller.java    | 22 +++++-
 .../hadoop/hbase/regionserver/wal/HLog.java     | 20 ++++--
 .../hadoop/hbase/regionserver/wal/TestHLog.java | 71 +++++++++++++++++---
 3 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 357f648..1eb0944 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -42,14 +43,25 @@ import org.apache.hadoop.hbase.util.HasThread;
  */
 public class LogRoller extends HasThread implements LogRollListener {
   static final Log LOG = LogFactory.getLog(LogRoller.class);
+
+  /**
+   * The minimum time in between requested log rolls.
+   * In MS.
+   */
+  public static final String LOG_ROLL_REQUEST_PERIOD_KEY =
+      "hbase.regionserver.logroll.request.period";
+  public static final long LOG_ROLL_REQUEST_PERIOD_DEFAULT = TimeUnit.SECONDS.toMillis(60);
+
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
   private final HRegionServer server;
   private volatile long lastrolltime = System.currentTimeMillis();
+  private volatile long lastrollRequestTime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
   private final int hlogIndexID;
   private final String logRollerName;
+  private final long timeBetweenRequest;
 
   /** @param server */
   public LogRoller(final HRegionServer server, int hlogIndexID) {
@@ -57,6 +69,8 @@ public class LogRoller extends HasThread implements LogRollListener {
     this.server = server;
     this.rollperiod =
       this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
+    this.timeBetweenRequest = this.server.conf.getLong(LOG_ROLL_REQUEST_PERIOD_KEY,
+        LOG_ROLL_REQUEST_PERIOD_DEFAULT);
     this.hlogIndexID = hlogIndexID;
     this.logRollerName = "HLogRoller-" + hlogIndexID + " ";
   }
@@ -152,9 +166,13 @@ public class LogRoller extends HasThread implements LogRollListener {
   }
 
   public void logRollRequested() {
+    long currentTime = System.currentTimeMillis();
     synchronized (rollLog) {
-      rollLog.set(true);
-      rollLog.notifyAll();
+      if ((currentTime - lastrollRequestTime) > timeBetweenRequest) {
+        lastrollRequestTime = currentTime;
+        rollLog.set(true);
+        rollLog.notifyAll();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index db775d9..044fef4 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -158,6 +158,7 @@ public class HLog implements Syncable {
 
   /** We include all timestamps by default */
   public static final long DEFAULT_LATEST_TS_TO_INCLUDE = Long.MAX_VALUE;
+  public static final long SECOND_IN_NS = TimeUnit.SECONDS.toNanos(1);
 
   // If enabled, old logs will be archived into hourly sub-directories instead of
   // server address sub-directories.
@@ -173,13 +174,15 @@ public class HLog implements Syncable {
   private final String prefix;
   private final AtomicLong unflushedEntries = new AtomicLong(0);
   private final Path oldLogDir;
+  private final int slowBeforeRoll;
+  private final AtomicLong slowSyncs = new AtomicLong(0);
   private volatile long syncTillHere = 0;
   private final List<LogActionsListener> actionListeners =
       Collections.synchronizedList(new ArrayList<LogActionsListener>());
 
 
-  private static Class<? extends Writer> logWriterClass;
-  private static Class<? extends Reader> logReaderClass;
+  static Class<? extends Writer> logWriterClass;
+  static Class<? extends Reader> logReaderClass;
 
   private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
   private int initialReplication;    // initial replication factor of SequenceFile.writer
@@ -458,6 +461,8 @@ public class HLog implements Syncable {
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
 
+    this.slowBeforeRoll = conf.getInt("hbase.hlog.slow.sync.before.roll", 5);
+
     if (!fs.exists(oldLogDir)) {
       fs.mkdirs(oldLogDir);
     }
@@ -606,6 +611,7 @@ public class HLog implements Syncable {
         newWriter.append(new HLog.Entry(key, edit));
         syncWriter(newWriter);
       }
+      this.slowSyncs.set(0);
     } catch (IOException ioe) {
       // If we fail to create a new writer, let us clean up the file.
       // Do not worry if the delete fails.
@@ -1091,11 +1097,17 @@ public class HLog implements Syncable {
     long end = System.nanoTime();
     long syncTime = end - start;
     gsyncTime.inc(syncTime);
-    if (syncTime > 1000000000) {
+    if (syncTime > SECOND_IN_NS) {
       LOG.warn(String.format(
-        "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
+        "%s took %d ns appending an edit to hlog; editcount=%d, len~=%s",
         Thread.currentThread().getName(), syncTime, this.numEntries.get(),
         StringUtils.humanReadableInt(len)));
+      slowSyncs.incrementAndGet();
+    }
+
+    if (slowSyncs.get() >= slowBeforeRoll) {
+      this.requestLogRoll();
+      this.slowSyncs.set(0);
     }
 
     // Update the per-request profiling data

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 7916599..b7d644e 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.UnstableTests;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ExceptionUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -99,7 +101,10 @@ public class TestHLog  {
 
   @After
   public void tearDown() throws Exception {
+    HLog.logReaderClass = null;
+    HLog.logWriterClass = null;
   }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // Make block sizes small.
@@ -118,19 +123,16 @@ public class TestHLog  {
     TEST_UTIL.getConfiguration().setInt(
         "dfs.client.block.recovery.retries", 1);
     TEST_UTIL.getConfiguration().setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, false);
-    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniDFSCluster(3);
 
     conf = TEST_UTIL.getConfiguration();
     cluster = TEST_UTIL.getDFSCluster();
     fs = cluster.getFileSystem();
 
-    hbaseDir = new Path(TEST_UTIL.getConfiguration().get("hbase.rootdir"));
+    hbaseDir = new Path(fs.getWorkingDirectory(), "hbase");
+    conf.set(HConstants.HBASE_DIR, hbaseDir.toString());
     oldLogDir = new Path(hbaseDir, ".oldlogs");
-    dir = new Path(hbaseDir, getName());
-  }
-  private static String getName() {
-    // TODO Auto-generated method stub
-    return "TestHLog";
+    dir = new Path(hbaseDir, TestHLog.class.getSimpleName());
   }
 
   /**
@@ -141,7 +143,7 @@ public class TestHLog  {
   @Test
   public void testSplit() throws IOException {
 
-    final byte [] tableName = Bytes.toBytes(getName());
+    final byte [] tableName = Bytes.toBytes(TestHLog.class.getSimpleName());
     final byte [] rowName = tableName;
     Path logdir = new Path(dir, HConstants.HREGION_LOGDIR_NAME);
     HLog log = new HLog(fs, logdir, oldLogDir, conf, null);
@@ -191,9 +193,9 @@ public class TestHLog  {
   @Category(UnstableTests.class)
   @Test
   public void testSync() throws Exception {
-    byte [] bytes = Bytes.toBytes(getName());
+    byte [] bytes = Bytes.toBytes(TestHLog.class.getSimpleName());
     // First verify that using streams all works.
-    Path p = new Path(dir, getName() + ".fsdos");
+    Path p = new Path(dir, TestHLog.class.getSimpleName() + ".fsdos");
     FSDataOutputStream out = fs.create(p);
     out.write(bytes);
     out.sync();
@@ -337,7 +339,7 @@ public class TestHLog  {
   // 3. HDFS-142 (on restart, maintain pendingCreates)
   @Test
   public void testAppendClose() throws Exception {
-    byte [] tableName = Bytes.toBytes(getName());
+    byte [] tableName = Bytes.toBytes(TestHLog.class.getSimpleName());
     HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
     Path subdir = new Path(dir, "hlogdir");
@@ -503,6 +505,40 @@ public class TestHLog  {
     }
   }
 
+  @Test
+  public void testSlowRoll() throws Exception {
+    // Make a copy of the conf so that things don't pollute other tests
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setClass("hbase.regionserver.hlog.writer.impl",
+                  SlowSequenceFileLogWriter.class,
+                  HLog.Writer.class);
+
+    int syncs = conf.getInt("hbase.hlog.slow.sync.before.roll", 5) * 2;
+
+    final AtomicInteger rollsRequests = new AtomicInteger(0);
+    HLog log = new HLog(fs, dir, oldLogDir, c, new LogRollListener() {
+      @Override public void logRollRequested() {
+        rollsRequests.incrementAndGet();
+      }
+    });
+
+    byte[] table = Bytes.toBytes("table");
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] qual = Bytes.toBytes("qual");
+    byte[] val = Bytes.toBytes("val");
+
+    HRegionInfo info = new HRegionInfo(new HTableDescriptor(table),
+        null,null, false);
+
+    for (int i = 0; i < syncs; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(Bytes.toBytes(i), cf, qual, HConstants.LATEST_TIMESTAMP, val));
+      log.append(info, table, kvs, System.currentTimeMillis());
+      log.sync(true);
+    }
+    assertEquals("Should request 2 rolls", 2, rollsRequests.get());
+  }
+
   /**
    * @throws IOException
    */
@@ -615,6 +651,19 @@ public class TestHLog  {
     }
   }
 
+
+  public static class SlowSequenceFileLogWriter extends SequenceFileLogWriter implements HLog.Writer {
+    @Override
+    public void sync() throws IOException {
+      try {
+        Thread.sleep(1100);
+      } catch (InterruptedException e) {
+        ExceptionUtils.toIOException(e);
+      }
+      super.sync();
+    }
+  }
+
   static class DumbLogEntriesVisitor implements LogEntryVisitor {
 
     int increments = 0;