You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/24 11:02:40 UTC

[3/3] hbase git commit: HBASE-13529 Procedure v2 - WAL Improvements

HBASE-13529 Procedure v2 - WAL Improvements


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba217ac0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba217ac0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba217ac0

Branch: refs/heads/branch-1.1
Commit: ba217ac07d4c5834216646bf953e9167135100e5
Parents: 5511ce6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Wed Apr 22 21:23:52 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Apr 24 09:52:13 2015 +0100

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java | 16 +++--
 .../procedure2/store/wal/WALProcedureStore.java | 67 +++++++++++++++-----
 .../hbase/procedure2/util/StringUtils.java      |  4 ++
 .../store/TestProcedureStoreTracker.java        | 26 ++++++++
 4 files changed, 91 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba217ac0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 152f131..3a878cc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -52,6 +52,7 @@ public class ProcedureStoreTracker {
     private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
     private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
 
+    private final boolean partial;
     private long[] updated;
     private long[] deleted;
     private long start;
@@ -88,6 +89,7 @@ public class ProcedureStoreTracker {
         deleted[i] = partial ? 0 : WORD_MASK;
       }
 
+      this.partial = partial;
       updateState(procId, false);
     }
 
@@ -95,6 +97,7 @@ public class ProcedureStoreTracker {
       this.start = start;
       this.updated = updated;
       this.deleted = deleted;
+      this.partial = false;
     }
 
     public void update(final long procId) {
@@ -223,17 +226,18 @@ public class ProcedureStoreTracker {
       int oldSize = updated.length;
 
       newBitmap = new long[oldSize + delta];
+      for (int i = 0; i < newBitmap.length; ++i) {
+        newBitmap[i] = 0;
+      }
       System.arraycopy(updated, 0, newBitmap, offset, oldSize);
       updated = newBitmap;
 
       newBitmap = new long[deleted.length + delta];
+      for (int i = 0; i < newBitmap.length; ++i) {
+        newBitmap[i] = partial ? 0 : WORD_MASK;
+      }
       System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
       deleted = newBitmap;
-
-      for (int i = 0; i < delta; ++i) {
-        updated[oldSize + i] = 0;
-        deleted[oldSize + i] = WORD_MASK;
-      }
     }
 
     public void merge(final BitSetNode rightNode) {
@@ -256,7 +260,7 @@ public class ProcedureStoreTracker {
 
       for (int i = 0; i < newSize; ++i) {
         updated[offset + i] = 0;
-        deleted[offset + i] = WORD_MASK;
+        deleted[offset + i] = partial ? 0 : WORD_MASK;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba217ac0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 91748fd..fe429ea 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
 import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.Arrays;
@@ -69,6 +70,12 @@ public class WALProcedureStore implements ProcedureStore {
   private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
   private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
 
+  private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
+  private static final boolean DEFAULT_USE_HSYNC = true;
+
+  private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
+  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
+
   private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
     new CopyOnWriteArrayList<ProcedureStoreListener>();
 
@@ -86,14 +93,18 @@ public class WALProcedureStore implements ProcedureStore {
   private final Path logDir;
 
   private AtomicBoolean inSync = new AtomicBoolean(false);
-  private ArrayBlockingQueue<ByteSlot> slotsCache = null;
+  private LinkedTransferQueue<ByteSlot> slotsCache = null;
   private Set<ProcedureWALFile> corruptedLogs = null;
+  private AtomicLong totalSynced = new AtomicLong(0);
   private FSDataOutputStream stream = null;
-  private long totalSynced = 0;
+  private long lastRollTs = 0;
   private long flushLogId = 0;
   private int slotIndex = 0;
   private Thread syncThread;
   private ByteSlot[] slots;
+
+  private long rollThreshold;
+  private boolean useHsync;
   private int syncWaitMsec;
 
   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
@@ -112,13 +123,15 @@ public class WALProcedureStore implements ProcedureStore {
 
     // Init buffer slots
     slots = new ByteSlot[numSlots];
-    slotsCache = new ArrayBlockingQueue(numSlots, true);
-    while (slotsCache.remainingCapacity() > 0) {
+    slotsCache = new LinkedTransferQueue();
+    while (slotsCache.size() < numSlots) {
       slotsCache.offer(new ByteSlot());
     }
 
     // Tunings
+    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
+    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
 
     // Init sync thread
     syncThread = new Thread("WALProcedureStoreSyncThread") {
@@ -297,9 +310,7 @@ public class WALProcedureStore implements ProcedureStore {
 
     // Update the store tracker
     synchronized (storeTracker) {
-      if (logId == flushLogId) {
-        storeTracker.insert(proc, subprocs);
-      }
+      storeTracker.insert(proc, subprocs);
     }
   }
 
@@ -329,8 +340,8 @@ public class WALProcedureStore implements ProcedureStore {
     // Update the store tracker
     boolean removeOldLogs = false;
     synchronized (storeTracker) {
+      storeTracker.update(proc);
       if (logId == flushLogId) {
-        storeTracker.update(proc);
         removeOldLogs = storeTracker.isUpdated();
       }
     }
@@ -365,9 +376,9 @@ public class WALProcedureStore implements ProcedureStore {
 
     boolean removeOldLogs = false;
     synchronized (storeTracker) {
+      storeTracker.delete(procId);
       if (logId == flushLogId) {
-        storeTracker.delete(procId);
-        if (storeTracker.isEmpty()) {
+        if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
           removeOldLogs = rollWriterOrDie(logId + 1);
         }
       }
@@ -416,8 +427,10 @@ public class WALProcedureStore implements ProcedureStore {
 
       // Notify that the slots are full
       if (slotIndex == slots.length) {
+        waitCond.signal();
         slotCond.signal();
       }
+
       syncCond.await();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -436,7 +449,10 @@ public class WALProcedureStore implements ProcedureStore {
         // Wait until new data is available
         if (slotIndex == 0) {
           if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced));
+            float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
+            LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
+                      StringUtils.humanSize(totalSynced.get()),
+                      StringUtils.humanSize(totalSynced.get() / rollTsSec)));
           }
           waitCond.await();
           if (slotIndex == 0) {
@@ -446,10 +462,22 @@ public class WALProcedureStore implements ProcedureStore {
         }
 
         // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
-        slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+        long syncWaitSt = System.currentTimeMillis();
+        if (slotIndex != slots.length) {
+          slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+        }
+        long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
+        if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
+          float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
+          LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
+                    " slotIndex=" + slotIndex +
+                    " totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
+                    " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
+        }
+
 
         inSync.set(true);
-        totalSynced += syncSlots();
+        totalSynced.addAndGet(syncSlots());
         slotIndex = 0;
         inSync.set(false);
         syncCond.signalAll();
@@ -487,7 +515,13 @@ public class WALProcedureStore implements ProcedureStore {
       data.writeTo(stream);
       totalSynced += data.size();
     }
-    stream.hsync();
+
+    if (useHsync) {
+      stream.hsync();
+    } else {
+      stream.hflush();
+    }
+
     if (LOG.isTraceEnabled()) {
       LOG.trace("Sync slots=" + count + '/' + slots.length +
                 " flushed=" + StringUtils.humanSize(totalSynced));
@@ -541,7 +575,8 @@ public class WALProcedureStore implements ProcedureStore {
       }
       stream = newStream;
       flushLogId = logId;
-      totalSynced = 0;
+      totalSynced.set(0);
+      lastRollTs = System.currentTimeMillis();
       logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
     } finally {
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba217ac0/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
index 97134c2..a2c4713 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
@@ -27,6 +27,10 @@ public final class StringUtils {
   private StringUtils() {}
 
   public static String humanTimeDiff(long timeDiff) {
+    if (timeDiff < 1000) {
+      return String.format("%dmsec", timeDiff);
+    }
+
     StringBuilder buf = new StringBuilder();
     long hours = timeDiff / (60*60*1000);
     long rem = (timeDiff % (60*60*1000));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba217ac0/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 9475e67..424d9ce 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -210,4 +210,30 @@ public class TestProcedureStoreTracker {
       }
     }
   }
+
+  @Test
+  public void testDelete() {
+    final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+
+    long[] procIds = new long[] { 65, 1, 193 };
+    for (int i = 0; i < procIds.length; ++i) {
+      tracker.insert(procIds[i]);
+      tracker.dump();
+    }
+
+    for (int i = 0; i < (64 * 4); ++i) {
+      boolean hasProc = false;
+      for (int j = 0; j < procIds.length; ++j) {
+        if (procIds[j] == i) {
+          hasProc = true;
+          break;
+        }
+      }
+      if (hasProc) {
+        assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(i));
+      } else {
+        assertEquals("procId=" + i, ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(i));
+      }
+    }
+  }
 }