You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:07:51 UTC
[15/50] [abbrv] git commit: [HBASE-11283] Roll HLog if sync time is
anomalous
[HBASE-11283] Roll HLog if sync time is anomalous
Summary:
When a WAL gets slow this will schedule a roll.
This diff will also rate limit the forced rolls.
Test Plan: Added testSlowRoll to TestHLog
Reviewers: leon, elenapr, aaiyer, manukranthk
Reviewed By: manukranthk
Subscribers: daviddeng, aleksandr, hbase-prodeng@, hbase-eng@
Differential Revision: https://phabricator.fb.com/D1359586
Tasks: 4422839
git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@42799 e7acf4d4-3532-417f-9e73-7a9ae25a1f51
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ae31cc53
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ae31cc53
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ae31cc53
Branch: refs/heads/0.89-fb
Commit: ae31cc53050bdf656bf16f094c0b066eb5a0fc0e
Parents: b848685
Author: elliott <el...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Wed Jun 11 17:18:51 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:44:22 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/LogRoller.java | 22 +++++-
.../hadoop/hbase/regionserver/wal/HLog.java | 20 ++++--
.../hadoop/hbase/regionserver/wal/TestHLog.java | 71 +++++++++++++++++---
3 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 357f648..1eb0944 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.util.HasThread;
@@ -42,14 +43,25 @@ import org.apache.hadoop.hbase.util.HasThread;
*/
public class LogRoller extends HasThread implements LogRollListener {
static final Log LOG = LogFactory.getLog(LogRoller.class);
+
+ /**
+ * The minimum time in between requested log rolls.
+ * In MS.
+ */
+ public static final String LOG_ROLL_REQUEST_PERIOD_KEY =
+ "hbase.regionserver.logroll.request.period";
+ public static final long LOG_ROLL_REQUEST_PERIOD_DEFAULT = TimeUnit.SECONDS.toMillis(60);
+
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final HRegionServer server;
private volatile long lastrolltime = System.currentTimeMillis();
+ private volatile long lastrollRequestTime = System.currentTimeMillis();
// Period to roll log.
private final long rollperiod;
private final int hlogIndexID;
private final String logRollerName;
+ private final long timeBetweenRequest;
/** @param server */
public LogRoller(final HRegionServer server, int hlogIndexID) {
@@ -57,6 +69,8 @@ public class LogRoller extends HasThread implements LogRollListener {
this.server = server;
this.rollperiod =
this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
+ this.timeBetweenRequest = this.server.conf.getLong(LOG_ROLL_REQUEST_PERIOD_KEY,
+ LOG_ROLL_REQUEST_PERIOD_DEFAULT);
this.hlogIndexID = hlogIndexID;
this.logRollerName = "HLogRoller-" + hlogIndexID + " ";
}
@@ -152,9 +166,13 @@ public class LogRoller extends HasThread implements LogRollListener {
}
public void logRollRequested() {
+ long currentTime = System.currentTimeMillis();
synchronized (rollLog) {
- rollLog.set(true);
- rollLog.notifyAll();
+ if ((currentTime - lastrollRequestTime) > timeBetweenRequest) {
+ lastrollRequestTime = currentTime;
+ rollLog.set(true);
+ rollLog.notifyAll();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index db775d9..044fef4 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -158,6 +158,7 @@ public class HLog implements Syncable {
/** We include all timestamps by default */
public static final long DEFAULT_LATEST_TS_TO_INCLUDE = Long.MAX_VALUE;
+ public static final long SECOND_IN_NS = TimeUnit.SECONDS.toNanos(1);
// If enabled, old logs will be archived into hourly sub-directories instead of
// server address sub-directories.
@@ -173,13 +174,15 @@ public class HLog implements Syncable {
private final String prefix;
private final AtomicLong unflushedEntries = new AtomicLong(0);
private final Path oldLogDir;
+ private final int slowBeforeRoll;
+ private final AtomicLong slowSyncs = new AtomicLong(0);
private volatile long syncTillHere = 0;
private final List<LogActionsListener> actionListeners =
Collections.synchronizedList(new ArrayList<LogActionsListener>());
- private static Class<? extends Writer> logWriterClass;
- private static Class<? extends Reader> logReaderClass;
+ static Class<? extends Writer> logWriterClass;
+ static Class<? extends Reader> logReaderClass;
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
private int initialReplication; // initial replication factor of SequenceFile.writer
@@ -458,6 +461,8 @@ public class HLog implements Syncable {
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
+ this.slowBeforeRoll = conf.getInt("hbase.hlog.slow.sync.before.roll", 5);
+
if (!fs.exists(oldLogDir)) {
fs.mkdirs(oldLogDir);
}
@@ -606,6 +611,7 @@ public class HLog implements Syncable {
newWriter.append(new HLog.Entry(key, edit));
syncWriter(newWriter);
}
+ this.slowSyncs.set(0);
} catch (IOException ioe) {
// If we fail to create a new writer, let us clean up the file.
// Do not worry if the delete fails.
@@ -1091,11 +1097,17 @@ public class HLog implements Syncable {
long end = System.nanoTime();
long syncTime = end - start;
gsyncTime.inc(syncTime);
- if (syncTime > 1000000000) {
+ if (syncTime > SECOND_IN_NS) {
LOG.warn(String.format(
- "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
+ "%s took %d ns appending an edit to hlog; editcount=%d, len~=%s",
Thread.currentThread().getName(), syncTime, this.numEntries.get(),
StringUtils.humanReadableInt(len)));
+ slowSyncs.incrementAndGet();
+ }
+
+ if (slowSyncs.get() >= slowBeforeRoll) {
+ this.requestLogRoll();
+ this.slowSyncs.set(0);
}
// Update the per-request profiling data
http://git-wip-us.apache.org/repos/asf/hbase/blob/ae31cc53/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 7916599..b7d644e 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.UnstableTests;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ExceptionUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hdfs.DFSClient;
@@ -99,7 +101,10 @@ public class TestHLog {
@After
public void tearDown() throws Exception {
+ HLog.logReaderClass = null;
+ HLog.logWriterClass = null;
}
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Make block sizes small.
@@ -118,19 +123,16 @@ public class TestHLog {
TEST_UTIL.getConfiguration().setInt(
"dfs.client.block.recovery.retries", 1);
TEST_UTIL.getConfiguration().setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, false);
- TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.startMiniDFSCluster(3);
conf = TEST_UTIL.getConfiguration();
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
- hbaseDir = new Path(TEST_UTIL.getConfiguration().get("hbase.rootdir"));
+ hbaseDir = new Path(fs.getWorkingDirectory(), "hbase");
+ conf.set(HConstants.HBASE_DIR, hbaseDir.toString());
oldLogDir = new Path(hbaseDir, ".oldlogs");
- dir = new Path(hbaseDir, getName());
- }
- private static String getName() {
- // TODO Auto-generated method stub
- return "TestHLog";
+ dir = new Path(hbaseDir, TestHLog.class.getSimpleName());
}
/**
@@ -141,7 +143,7 @@ public class TestHLog {
@Test
public void testSplit() throws IOException {
- final byte [] tableName = Bytes.toBytes(getName());
+ final byte [] tableName = Bytes.toBytes(TestHLog.class.getSimpleName());
final byte [] rowName = tableName;
Path logdir = new Path(dir, HConstants.HREGION_LOGDIR_NAME);
HLog log = new HLog(fs, logdir, oldLogDir, conf, null);
@@ -191,9 +193,9 @@ public class TestHLog {
@Category(UnstableTests.class)
@Test
public void testSync() throws Exception {
- byte [] bytes = Bytes.toBytes(getName());
+ byte [] bytes = Bytes.toBytes(TestHLog.class.getSimpleName());
// First verify that using streams all works.
- Path p = new Path(dir, getName() + ".fsdos");
+ Path p = new Path(dir, TestHLog.class.getSimpleName() + ".fsdos");
FSDataOutputStream out = fs.create(p);
out.write(bytes);
out.sync();
@@ -337,7 +339,7 @@ public class TestHLog {
// 3. HDFS-142 (on restart, maintain pendingCreates)
@Test
public void testAppendClose() throws Exception {
- byte [] tableName = Bytes.toBytes(getName());
+ byte [] tableName = Bytes.toBytes(TestHLog.class.getSimpleName());
HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
Path subdir = new Path(dir, "hlogdir");
@@ -503,6 +505,40 @@ public class TestHLog {
}
}
+ @Test
+ public void testSlowRoll() throws Exception {
+ // Make a copy of the conf so that things don't pollute other tests
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.setClass("hbase.regionserver.hlog.writer.impl",
+ SlowSequenceFileLogWriter.class,
+ HLog.Writer.class);
+
+ int syncs = conf.getInt("hbase.hlog.slow.sync.before.roll", 5) * 2;
+
+ final AtomicInteger rollsRequests = new AtomicInteger(0);
+ HLog log = new HLog(fs, dir, oldLogDir, c, new LogRollListener() {
+ @Override public void logRollRequested() {
+ rollsRequests.incrementAndGet();
+ }
+ });
+
+ byte[] table = Bytes.toBytes("table");
+ byte[] cf = Bytes.toBytes("cf");
+ byte[] qual = Bytes.toBytes("qual");
+ byte[] val = Bytes.toBytes("val");
+
+ HRegionInfo info = new HRegionInfo(new HTableDescriptor(table),
+ null,null, false);
+
+ for (int i = 0; i < syncs; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), cf, qual, HConstants.LATEST_TIMESTAMP, val));
+ log.append(info, table, kvs, System.currentTimeMillis());
+ log.sync(true);
+ }
+ assertEquals("Should request 2 rolls", 2, rollsRequests.get());
+ }
+
/**
* @throws IOException
*/
@@ -615,6 +651,19 @@ public class TestHLog {
}
}
+
+ public static class SlowSequenceFileLogWriter extends SequenceFileLogWriter implements HLog.Writer {
+ @Override
+ public void sync() throws IOException {
+ try {
+ Thread.sleep(1100);
+ } catch (InterruptedException e) {
+ ExceptionUtils.toIOException(e);
+ }
+ super.sync();
+ }
+ }
+
static class DumbLogEntriesVisitor implements LogEntryVisitor {
int increments = 0;