You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/10 02:58:10 UTC

hbase git commit: HBASE-17053 Remove LogRollerExitedChecker

Repository: hbase
Updated Branches:
  refs/heads/master 8192a6b6e -> e5a288e5c


HBASE-17053 Remove LogRollerExitedChecker


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5a288e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5a288e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5a288e5

Branch: refs/heads/master
Commit: e5a288e5c04d6f9a1b31549e4e3d979aeee4fd94
Parents: 8192a6b
Author: zhangduo <zh...@apache.org>
Authored: Wed Nov 9 21:07:07 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Nov 10 10:52:49 2016 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/HRegionServer.java       |  51 +--------
 .../hadoop/hbase/regionserver/LogRoller.java    |  16 ++-
 .../hbase/regionserver/wal/AsyncFSWAL.java      | 106 -------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |   4 -
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   4 -
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   7 --
 .../regionserver/TestFailedAppendAndSync.java   |   2 +-
 .../hbase/regionserver/TestWALLockup.java       |   4 +-
 .../hbase/wal/WALPerformanceEvaluation.java     |   2 +-
 9 files changed, 20 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 312e8c1..24d8170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -375,8 +375,6 @@ public class HRegionServer extends HasThread implements
   // WAL roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   final LogRoller walRoller;
-  // Lazily initialized if this RegionServer hosts a meta table.
-  final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
 
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
@@ -1722,34 +1720,6 @@ public class HRegionServer extends HasThread implements
     return new WALFactory(conf, listeners, serverName.toString());
   }
 
-  /**
-   * We initialize the roller for the wal that handles meta lazily
-   * since we don't know if this regionserver will handle it. All calls to
-   * this method return a reference to the that same roller. As newly referenced
-   * meta regions are brought online, they will be offered to the roller for maintenance.
-   * As a part of that registration process, the roller will add itself as a
-   * listener on the wal.
-   */
-  protected LogRoller ensureMetaWALRoller() {
-    // Using a tmp log roller to ensure metaLogRoller is alive once it is not
-    // null
-    LogRoller roller = metawalRoller.get();
-    if (null == roller) {
-      LogRoller tmpLogRoller = new LogRoller(this, this);
-      String n = Thread.currentThread().getName();
-      Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
-          n + "-MetaLogRoller", uncaughtExceptionHandler);
-      if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
-        roller = tmpLogRoller;
-      } else {
-        // Another thread won starting the roller
-        Threads.shutdown(tmpLogRoller.getThread());
-        roller = metawalRoller.get();
-      }
-    }
-    return roller;
-  }
-
   public MetricsRegionServer getRegionServerMetrics() {
     return this.metricsRegionServer;
   }
@@ -1914,11 +1884,6 @@ public class HRegionServer extends HasThread implements
       stop("One or more threads are no longer alive -- stop");
       return false;
     }
-    final LogRoller metawalRoller = this.metawalRoller.get();
-    if (metawalRoller != null && !metawalRoller.isAlive()) {
-      stop("Meta WAL roller thread is no longer alive -- stop");
-      return false;
-    }
     return true;
   }
 
@@ -1932,11 +1897,9 @@ public class HRegionServer extends HasThread implements
   @Override
   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
     WAL wal;
-    LogRoller roller = walRoller;
-    //_ROOT_ and hbase:meta regions have separate WAL.
-    if (regionInfo != null && regionInfo.isMetaTable() &&
-        regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
-      roller = ensureMetaWALRoller();
+    // _ROOT_ and hbase:meta regions have separate WAL.
+    if (regionInfo != null && regionInfo.isMetaTable()
+        && regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
     } else if (regionInfo == null) {
       wal = walFactory.getWAL(UNSPECIFIED_REGION, null);
@@ -1944,7 +1907,7 @@ public class HRegionServer extends HasThread implements
       byte[] namespace = regionInfo.getTable().getNamespace();
       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
     }
-    roller.addWAL(wal);
+    walRoller.addWAL(wal);
     return wal;
   }
 
@@ -2330,11 +2293,7 @@ public class HRegionServer extends HasThread implements
       this.spanReceiverHost.closeReceivers();
     }
     if (this.walRoller != null) {
-      Threads.shutdown(this.walRoller.getThread());
-    }
-    final LogRoller metawalRoller = this.metawalRoller.get();
-    if (metawalRoller != null) {
-      Threads.shutdown(metawalRoller.getThread());
+      this.walRoller.close();
     }
     if (this.compactSplitThread != null) {
       this.compactSplitThread.join();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 9a2bb34..24f0d1a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -49,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @VisibleForTesting
-public class LogRoller extends HasThread {
+public class LogRoller extends HasThread implements Closeable {
   private static final Log LOG = LogFactory.getLog(LogRoller.class);
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
@@ -62,6 +63,8 @@ public class LogRoller extends HasThread {
   private final long rollperiod;
   private final int threadWakeFrequency;
 
+  private volatile boolean running = true;
+
   public void addWAL(final WAL wal) {
     if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
       wal.registerWALActionsListener(new WALActionsListener.Base() {
@@ -110,7 +113,7 @@ public class LogRoller extends HasThread {
 
   @Override
   public void run() {
-    while (!server.isStopped()) {
+    while (running) {
       long now = System.currentTimeMillis();
       boolean periodic = false;
       if (!rollLog.get()) {
@@ -167,9 +170,6 @@ public class LogRoller extends HasThread {
         }
       }
     }
-    for (WAL wal : walNeedsRoll.keySet()) {
-      wal.logRollerExited();
-    }
     LOG.info("LogRoller exiting.");
   }
 
@@ -208,4 +208,10 @@ public class LogRoller extends HasThread {
     }
     return true;
   }
+
+  @Override
+  public void close() {
+    running = false;
+    interrupt();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index d842f1b..78a3e8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -25,7 +25,6 @@ import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.Sequencer;
 
 import io.netty.channel.EventLoop;
-import io.netty.util.concurrent.ScheduledFuture;
 import io.netty.util.concurrent.SingleThreadEventExecutor;
 
 import java.io.IOException;
@@ -40,7 +39,6 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -143,10 +141,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
   public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
 
-  public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS =
-      "hbase.wal.async.logroller.exited.check.interval.ms";
-  public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000;
-
   private final EventLoop eventLoop;
 
   private final Lock consumeLock = new ReentrantLock();
@@ -176,8 +170,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final int createMaxRetries;
 
-  private final long logRollerExitedCheckIntervalMs;
-
   private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
     new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
 
@@ -196,85 +188,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   // file length when we issue last sync request on the writer
   private long fileLengthAtLastSync;
 
-  private volatile boolean logRollerExited;
-
-  private final class LogRollerExitedChecker implements Runnable {
-
-    private boolean cancelled;
-
-    private ScheduledFuture<?> future;
-
-    public synchronized void setFuture(ScheduledFuture<?> future) {
-      this.future = future;
-    }
-
-    // See the comments in syncFailed why we need to do this.
-    private void cleanup() {
-      unackedAppends.clear();
-      toWriteAppends.forEach(entry -> {
-        try {
-          entry.stampRegionSequenceId();
-        } catch (IOException e) {
-          throw new AssertionError("should not happen", e);
-        }
-      });
-      toWriteAppends.clear();
-      IOException error = new IOException("sync failed but log roller exited");
-      for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
-        sync.done(sync.getTxid(), error);
-        syncFutures.remove();
-      }
-      long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
-      for (long cursorBound =
-          waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
-        if (!waitingConsumePayloads.isPublished(nextCursor)) {
-          break;
-        }
-        RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
-        switch (truck.type()) {
-          case APPEND:
-            try {
-              truck.unloadAppend().stampRegionSequenceId();
-            } catch (IOException e) {
-              throw new AssertionError("should not happen", e);
-            }
-            break;
-          case SYNC:
-            SyncFuture sync = truck.unloadSync();
-            sync.done(sync.getTxid(), error);
-            break;
-          default:
-            LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
-            break;
-        }
-        waitingConsumePayloadsGatingSequence.set(nextCursor);
-      }
-    }
-
-    @Override
-    public void run() {
-      if (!logRollerExited) {
-        return;
-      }
-      // rollWriter is called in the log roller thread, and logRollerExited will be set just before
-      // the log rolled exit. So here we can confirm that no one could cancel us if the 'canceled'
-      // check passed. So it is safe to release the lock after checking 'canceled' flag.
-      synchronized (this) {
-        if (cancelled) {
-          return;
-        }
-      }
-      cleanup();
-    }
-
-    public synchronized void cancel() {
-      future.cancel(false);
-      cancelled = true;
-    }
-  }
-
-  private LogRollerExitedChecker logRollerExitedChecker;
-
   public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
       String prefix, String suffix, EventLoop eventLoop)
@@ -312,8 +225,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
     createMaxRetries =
         conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
-    logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
-      DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
     rollWriter();
   }
 
@@ -357,14 +268,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       if (writerBroken) {
         return;
       }
-      // schedule a periodical task to check if log roller is exited. Otherwise the the sync
-      // request maybe blocked forever since we are still waiting for a new writer to write the
-      // pending data and sync it...
-      logRollerExitedChecker = new LogRollerExitedChecker();
-      // we are currently in the EventLoop thread, so it is safe to set the future after
-      // schedule it since the task can not be executed before we release the thread.
-      logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
-        logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
       writerBroken = true;
       if (waitingRoll) {
         readyForRolling = true;
@@ -708,11 +611,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   }
 
   @Override
-  public void logRollerExited() {
-    logRollerExited = true;
-  }
-
-  @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
     boolean overwrite = false;
     for (int retry = 0;; retry++) {
@@ -779,10 +677,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     try {
       consumerScheduled.set(true);
       writerBroken = waitingRoll = false;
-      if (logRollerExitedChecker != null) {
-        logRollerExitedChecker.cancel();
-        logRollerExitedChecker = null;
-      }
       eventLoop.execute(consumer);
     } finally {
       consumeLock.unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 426e3b1..b4f0a29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -795,10 +795,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
   }
 
-  @Override
-  public void logRollerExited() {
-  }
-
   @VisibleForTesting
   boolean isLowReplicationRollEnabled() {
     return lowReplicationRollEnabled;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0725c4e..7f10d7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -228,10 +228,6 @@ class DisabledWALProvider implements WALProvider {
     public String toString() {
       return "WAL disabled.";
     }
-
-    @Override
-    public void logRollerExited() {
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 13ab85e..a9c9fe7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -212,13 +212,6 @@ public interface WAL extends Closeable {
   String toString();
 
   /**
-   * In some WAL implementation, we will write WAL entries to new file if sync failed, which means,
-   * the fail recovery is depended on log roller. So here we tell the WAL that log roller has
-   * already been exited so the WAL cloud give up recovery.
-   */
-  void logRollerExited();
-
-  /**
    * When outside clients need to consume persisted WALs, they rely on a provided
    * Reader.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index e9ff8ec..ad88cfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -245,7 +245,7 @@ public class TestFailedAppendAndSync {
     } finally {
       // To stop logRoller, its server has to say it is stopped.
       Mockito.when(server.isStopped()).thenReturn(true);
-      if (logRoller != null) logRoller.interrupt();
+      if (logRoller != null) logRoller.close();
       if (region != null) {
         try {
           region.close(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 63fbb69..31f9a42 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -285,7 +285,7 @@ public class TestWALLockup {
     } finally {
       // To stop logRoller, its server has to say it is stopped.
       Mockito.when(server.isStopped()).thenReturn(true);
-      if (logRoller != null) logRoller.interrupt();
+      if (logRoller != null) logRoller.close();
       try {
         if (region != null) region.close();
         if (dodgyWAL != null) dodgyWAL.close();
@@ -469,7 +469,7 @@ public class TestWALLockup {
       assertTrue(server.isAborted());
     } finally {
       if (logRoller != null) {
-        logRoller.interrupt();
+        logRoller.close();
       }
       try {
         if (region != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index c7ea5f2..1b513b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -382,7 +382,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
         }
         if (null != roller) {
           LOG.info("shutting down log roller.");
-          Threads.shutdown(roller.getThread());
+          roller.close();
         }
         wals.shutdown();
         // Remove the root dir for this test region