You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/12/28 00:19:39 UTC

hbase git commit: HBASE-17090 Procedure v2 - fast wake if nothing else is running (Matteo Bertozzi)

Repository: hbase
Updated Branches:
  refs/heads/master 306ef83c9 -> da97569ea


HBASE-17090 Procedure v2 - fast wake if nothing else is running (Matteo Bertozzi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da97569e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da97569e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da97569e

Branch: refs/heads/master
Commit: da97569eae662ad90fd3afd98ef148c94eee4ac1
Parents: 306ef83
Author: Michael Stack <st...@apache.org>
Authored: Tue Dec 27 16:19:32 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Dec 27 16:19:32 2016 -0800

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     |  4 +-
 .../procedure2/store/NoopProcedureStore.java    |  5 +++
 .../hbase/procedure2/store/ProcedureStore.java  |  6 +++
 .../procedure2/store/wal/WALProcedureStore.java | 42 ++++++++++++++------
 .../wal/ProcedureWALPerformanceEvaluation.java  | 17 +++-----
 5 files changed, 48 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 80c3804..c65f3fb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -1536,7 +1536,7 @@ public class ProcedureExecutor<TEnvironment> {
         final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
         if (procedure == null) continue;
 
-        activeExecutorCount.incrementAndGet();
+        store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
         executionStartTime.set(EnvironmentEdgeManager.currentTime());
         try {
           if (isTraceEnabled) {
@@ -1544,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> {
           }
           executeProcedure(procedure);
         } finally {
-          activeExecutorCount.decrementAndGet();
+          store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
           lastUpdate = EnvironmentEdgeManager.currentTime();
           executionStartTime.set(Long.MAX_VALUE);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index f248dc3..c03e326 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -52,6 +52,11 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
+  public void setRunningProcedureCount(final int count) {
+    // no-op
+  }
+
+  @Override
   public void load(final ProcedureLoader loader) throws IOException {
     loader.setMaxProcId(0);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index e47ed63..032c8fc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -151,6 +151,12 @@ public interface ProcedureStore {
   int getNumThreads();
 
   /**
+   * Set the number of procedure running.
+   * This can be used, for example, by the store to know how long to wait before a sync.
+   */
+  void setRunningProcedureCount(int count);
+
+  /**
    * Acquire the lease for the procedure store.
    */
   void recoverLease() throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 922b681..4465993 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -136,7 +136,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private LinkedTransferQueue<ByteSlot> slotsCache = null;
   private Set<ProcedureWALFile> corruptedLogs = null;
   private FSDataOutputStream stream = null;
+  private int runningProcCount = 1;
   private long flushLogId = 0;
+  private int syncMaxSlot = 1;
   private int slotIndex = 0;
   private Thread syncThread;
   private ByteSlot[] slots;
@@ -198,6 +200,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
     // Init buffer slots
     loading.set(true);
+    runningProcCount = numSlots;
+    syncMaxSlot = numSlots;
     slots = new ByteSlot[numSlots];
     slotsCache = new LinkedTransferQueue();
     while (slotsCache.size() < numSlots) {
@@ -288,6 +292,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return slots == null ? 0 : slots.length;
   }
 
+  @Override
+  public void setRunningProcedureCount(final int count) {
+    LOG.debug("set running procedure count=" + count + " slots=" + slots.length);
+    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
+  }
+
   public ProcedureStoreTracker getStoreTracker() {
     return storeTracker;
   }
@@ -623,7 +633,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           throw new RuntimeException("sync aborted", syncException.get());
         } else if (inSync.get()) {
           syncCond.await();
-        } else if (slotIndex == slots.length) {
+        } else if (slotIndex >= syncMaxSlot) {
           slotCond.signal();
           syncCond.await();
         } else {
@@ -642,7 +652,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
 
       // Notify that the slots are full
-      if (slotIndex == slots.length) {
+      if (slotIndex == syncMaxSlot) {
         waitCond.signal();
         slotCond.signal();
       }
@@ -725,8 +735,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
             }
           }
           // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+          syncMaxSlot = runningProcCount;
+          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
           final long syncWaitSt = System.currentTimeMillis();
-          if (slotIndex != slots.length) {
+          if (slotIndex != syncMaxSlot) {
             slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
           }
 
@@ -734,7 +746,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           final long syncWaitMs = currentTs - syncWaitSt;
           final float rollSec = getMillisFromLastRoll() / 1000.0f;
           final float syncedPerSec = totalSyncedToStore / rollSec;
-          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
+          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
             LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
                       StringUtils.humanSize(totalSyncedToStore),
@@ -813,29 +825,33 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return totalSynced;
   }
 
-  protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
-      throws IOException {
+  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
+      final int offset, final int count) throws IOException {
     long totalSynced = 0;
     for (int i = 0; i < count; ++i) {
-      ByteSlot data = slots[offset + i];
+      final ByteSlot data = slots[offset + i];
       data.writeTo(stream);
       totalSynced += data.size();
     }
 
-    if (useHsync) {
-      stream.hsync();
-    } else {
-      stream.hflush();
-    }
+    syncStream(stream);
     sendPostSyncSignal();
 
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Sync slots=" + count + '/' + slots.length +
+      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot +
                 ", flushed=" + StringUtils.humanSize(totalSynced));
     }
     return totalSynced;
   }
 
+  protected void syncStream(final FSDataOutputStream stream) throws IOException {
+    if (useHsync) {
+      stream.hsync();
+    } else {
+      stream.hflush();
+    }
+  }
+
   private boolean rollWriterWithRetries() {
     for (int i = 0; i < rollRetries && isRunning(); ++i) {
       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);

http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 363574b..641ac8e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -149,7 +149,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
       // Start worker threads.
       long start = System.currentTimeMillis();
       for (int i = 0; i < numThreads; i++) {
-        futures[i] = executor.submit(this.new Worker(start));
+        futures[i] = executor.submit(new Worker(start));
       }
       boolean failure = false;
       try {
@@ -197,8 +197,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
    * If procedure store fails to roll log file (throws IOException), all threads quit, and at
    * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
    */
-  class Worker implements Callable<Integer> {
-    final long start;
+  private final class Worker implements Callable<Integer> {
+    private final long start;
 
     public Worker(long start) {
       this.start = start;
@@ -243,7 +243,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
     }
   }
 
-  public class NoSyncWalProcedureStore extends WALProcedureStore {
+  private class NoSyncWalProcedureStore extends WALProcedureStore {
     public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
         final Path logDir) {
       super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
@@ -255,13 +255,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
     }
 
     @Override
-    protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
-        throws IOException {
-      long totalSynced = 0;
-      for (int i = 0; i < count; ++i) {
-        totalSynced += slots[offset + i].size();
-      }
-      return totalSynced;
+    protected void syncStream(FSDataOutputStream stream) {
+      // no-op
     }
   }