You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/10 02:58:10 UTC
hbase git commit: HBASE-17053 Remove LogRollerExitedChecker
Repository: hbase
Updated Branches:
refs/heads/master 8192a6b6e -> e5a288e5c
HBASE-17053 Remove LogRollerExitedChecker
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5a288e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5a288e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5a288e5
Branch: refs/heads/master
Commit: e5a288e5c04d6f9a1b31549e4e3d979aeee4fd94
Parents: 8192a6b
Author: zhangduo <zh...@apache.org>
Authored: Wed Nov 9 21:07:07 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Nov 10 10:52:49 2016 +0800
----------------------------------------------------------------------
.../hbase/regionserver/HRegionServer.java | 51 +--------
.../hadoop/hbase/regionserver/LogRoller.java | 16 ++-
.../hbase/regionserver/wal/AsyncFSWAL.java | 106 -------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 4 -
.../hadoop/hbase/wal/DisabledWALProvider.java | 4 -
.../java/org/apache/hadoop/hbase/wal/WAL.java | 7 --
.../regionserver/TestFailedAppendAndSync.java | 2 +-
.../hbase/regionserver/TestWALLockup.java | 4 +-
.../hbase/wal/WALPerformanceEvaluation.java | 2 +-
9 files changed, 20 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 312e8c1..24d8170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -375,8 +375,6 @@ public class HRegionServer extends HasThread implements
// WAL roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
final LogRoller walRoller;
- // Lazily initialized if this RegionServer hosts a meta table.
- final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@@ -1722,34 +1720,6 @@ public class HRegionServer extends HasThread implements
return new WALFactory(conf, listeners, serverName.toString());
}
- /**
- * We initialize the roller for the wal that handles meta lazily
- * since we don't know if this regionserver will handle it. All calls to
- * this method return a reference to the that same roller. As newly referenced
- * meta regions are brought online, they will be offered to the roller for maintenance.
- * As a part of that registration process, the roller will add itself as a
- * listener on the wal.
- */
- protected LogRoller ensureMetaWALRoller() {
- // Using a tmp log roller to ensure metaLogRoller is alive once it is not
- // null
- LogRoller roller = metawalRoller.get();
- if (null == roller) {
- LogRoller tmpLogRoller = new LogRoller(this, this);
- String n = Thread.currentThread().getName();
- Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
- n + "-MetaLogRoller", uncaughtExceptionHandler);
- if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
- roller = tmpLogRoller;
- } else {
- // Another thread won starting the roller
- Threads.shutdown(tmpLogRoller.getThread());
- roller = metawalRoller.get();
- }
- }
- return roller;
- }
-
public MetricsRegionServer getRegionServerMetrics() {
return this.metricsRegionServer;
}
@@ -1914,11 +1884,6 @@ public class HRegionServer extends HasThread implements
stop("One or more threads are no longer alive -- stop");
return false;
}
- final LogRoller metawalRoller = this.metawalRoller.get();
- if (metawalRoller != null && !metawalRoller.isAlive()) {
- stop("Meta WAL roller thread is no longer alive -- stop");
- return false;
- }
return true;
}
@@ -1932,11 +1897,9 @@ public class HRegionServer extends HasThread implements
@Override
public WAL getWAL(HRegionInfo regionInfo) throws IOException {
WAL wal;
- LogRoller roller = walRoller;
- //_ROOT_ and hbase:meta regions have separate WAL.
- if (regionInfo != null && regionInfo.isMetaTable() &&
- regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
- roller = ensureMetaWALRoller();
+ // _ROOT_ and hbase:meta regions have separate WAL.
+ if (regionInfo != null && regionInfo.isMetaTable()
+ && regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
} else if (regionInfo == null) {
wal = walFactory.getWAL(UNSPECIFIED_REGION, null);
@@ -1944,7 +1907,7 @@ public class HRegionServer extends HasThread implements
byte[] namespace = regionInfo.getTable().getNamespace();
wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
}
- roller.addWAL(wal);
+ walRoller.addWAL(wal);
return wal;
}
@@ -2330,11 +2293,7 @@ public class HRegionServer extends HasThread implements
this.spanReceiverHost.closeReceivers();
}
if (this.walRoller != null) {
- Threads.shutdown(this.walRoller.getThread());
- }
- final LogRoller metawalRoller = this.metawalRoller.get();
- if (metawalRoller != null) {
- Threads.shutdown(metawalRoller.getThread());
+ this.walRoller.close();
}
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 9a2bb34..24f0d1a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@VisibleForTesting
-public class LogRoller extends HasThread {
+public class LogRoller extends HasThread implements Closeable {
private static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
@@ -62,6 +63,8 @@ public class LogRoller extends HasThread {
private final long rollperiod;
private final int threadWakeFrequency;
+ private volatile boolean running = true;
+
public void addWAL(final WAL wal) {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
wal.registerWALActionsListener(new WALActionsListener.Base() {
@@ -110,7 +113,7 @@ public class LogRoller extends HasThread {
@Override
public void run() {
- while (!server.isStopped()) {
+ while (running) {
long now = System.currentTimeMillis();
boolean periodic = false;
if (!rollLog.get()) {
@@ -167,9 +170,6 @@ public class LogRoller extends HasThread {
}
}
}
- for (WAL wal : walNeedsRoll.keySet()) {
- wal.logRollerExited();
- }
LOG.info("LogRoller exiting.");
}
@@ -208,4 +208,10 @@ public class LogRoller extends HasThread {
}
return true;
}
+
+ @Override
+ public void close() {
+ running = false;
+ interrupt();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index d842f1b..78a3e8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -25,7 +25,6 @@ import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import io.netty.channel.EventLoop;
-import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.io.IOException;
@@ -40,7 +39,6 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -143,10 +141,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
- public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS =
- "hbase.wal.async.logroller.exited.check.interval.ms";
- public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000;
-
private final EventLoop eventLoop;
private final Lock consumeLock = new ReentrantLock();
@@ -176,8 +170,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final int createMaxRetries;
- private final long logRollerExitedCheckIntervalMs;
-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
@@ -196,85 +188,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// file length when we issue last sync request on the writer
private long fileLengthAtLastSync;
- private volatile boolean logRollerExited;
-
- private final class LogRollerExitedChecker implements Runnable {
-
- private boolean cancelled;
-
- private ScheduledFuture<?> future;
-
- public synchronized void setFuture(ScheduledFuture<?> future) {
- this.future = future;
- }
-
- // See the comments in syncFailed why we need to do this.
- private void cleanup() {
- unackedAppends.clear();
- toWriteAppends.forEach(entry -> {
- try {
- entry.stampRegionSequenceId();
- } catch (IOException e) {
- throw new AssertionError("should not happen", e);
- }
- });
- toWriteAppends.clear();
- IOException error = new IOException("sync failed but log roller exited");
- for (SyncFuture sync; (sync = syncFutures.peek()) != null;) {
- sync.done(sync.getTxid(), error);
- syncFutures.remove();
- }
- long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
- for (long cursorBound =
- waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) {
- if (!waitingConsumePayloads.isPublished(nextCursor)) {
- break;
- }
- RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
- switch (truck.type()) {
- case APPEND:
- try {
- truck.unloadAppend().stampRegionSequenceId();
- } catch (IOException e) {
- throw new AssertionError("should not happen", e);
- }
- break;
- case SYNC:
- SyncFuture sync = truck.unloadSync();
- sync.done(sync.getTxid(), error);
- break;
- default:
- LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
- break;
- }
- waitingConsumePayloadsGatingSequence.set(nextCursor);
- }
- }
-
- @Override
- public void run() {
- if (!logRollerExited) {
- return;
- }
- // rollWriter is called in the log roller thread, and logRollerExited will be set just before
- // the log rolled exit. So here we can confirm that no one could cancel us if the 'canceled'
- // check passed. So it is safe to release the lock after checking 'canceled' flag.
- synchronized (this) {
- if (cancelled) {
- return;
- }
- }
- cleanup();
- }
-
- public synchronized void cancel() {
- future.cancel(false);
- cancelled = true;
- }
- }
-
- private LogRollerExitedChecker logRollerExitedChecker;
-
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop)
@@ -312,8 +225,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
createMaxRetries =
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
- logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
- DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
rollWriter();
}
@@ -357,14 +268,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (writerBroken) {
return;
}
- // schedule a periodical task to check if log roller is exited. Otherwise the the sync
- // request maybe blocked forever since we are still waiting for a new writer to write the
- // pending data and sync it...
- logRollerExitedChecker = new LogRollerExitedChecker();
- // we are currently in the EventLoop thread, so it is safe to set the future after
- // schedule it since the task can not be executed before we release the thread.
- logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
- logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
writerBroken = true;
if (waitingRoll) {
readyForRolling = true;
@@ -708,11 +611,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
@Override
- public void logRollerExited() {
- logRollerExited = true;
- }
-
- @Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
boolean overwrite = false;
for (int retry = 0;; retry++) {
@@ -779,10 +677,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
try {
consumerScheduled.set(true);
writerBroken = waitingRoll = false;
- if (logRollerExitedChecker != null) {
- logRollerExitedChecker.cancel();
- logRollerExitedChecker = null;
- }
eventLoop.execute(consumer);
} finally {
consumeLock.unlock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 426e3b1..b4f0a29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -795,10 +795,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
- @Override
- public void logRollerExited() {
- }
-
@VisibleForTesting
boolean isLowReplicationRollEnabled() {
return lowReplicationRollEnabled;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0725c4e..7f10d7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -228,10 +228,6 @@ class DisabledWALProvider implements WALProvider {
public String toString() {
return "WAL disabled.";
}
-
- @Override
- public void logRollerExited() {
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 13ab85e..a9c9fe7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -212,13 +212,6 @@ public interface WAL extends Closeable {
String toString();
/**
- * In some WAL implementation, we will write WAL entries to new file if sync failed, which means,
- * the fail recovery is depended on log roller. So here we tell the WAL that log roller has
- * already been exited so the WAL cloud give up recovery.
- */
- void logRollerExited();
-
- /**
* When outside clients need to consume persisted WALs, they rely on a provided
* Reader.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index e9ff8ec..ad88cfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -245,7 +245,7 @@ public class TestFailedAppendAndSync {
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
- if (logRoller != null) logRoller.interrupt();
+ if (logRoller != null) logRoller.close();
if (region != null) {
try {
region.close(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 63fbb69..31f9a42 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -285,7 +285,7 @@ public class TestWALLockup {
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
- if (logRoller != null) logRoller.interrupt();
+ if (logRoller != null) logRoller.close();
try {
if (region != null) region.close();
if (dodgyWAL != null) dodgyWAL.close();
@@ -469,7 +469,7 @@ public class TestWALLockup {
assertTrue(server.isAborted());
} finally {
if (logRoller != null) {
- logRoller.interrupt();
+ logRoller.close();
}
try {
if (region != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index c7ea5f2..1b513b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -382,7 +382,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
}
if (null != roller) {
LOG.info("shutting down log roller.");
- Threads.shutdown(roller.getThread());
+ roller.close();
}
wals.shutdown();
// Remove the root dir for this test region