You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/19 05:59:44 UTC
svn commit: r1159498 - in /hbase/branches/0.90: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Author: stack
Date: Fri Aug 19 03:59:44 2011
New Revision: 1159498
URL: http://svn.apache.org/viewvc?rev=1159498&view=rev
Log:
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll is blocked
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Fri Aug 19 03:59:44 2011
@@ -14,6 +14,8 @@ Release 0.90.5 - Unreleased
Campbell)
HBASE-4161 Incorrect use of listStatus() in HBase region initialization
(Pritam Damania)
+ HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's
+ request of LogRoll is blocked (Jieshan Bean)
IMPROVEMENT
HBASE-4205 Enhance HTable javadoc (Eric Charles)
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 19 03:59:44 2011
@@ -29,11 +29,11 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.Arrays;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
@@ -129,7 +128,7 @@ public class HLog implements Syncable {
private final long blocksize;
private final String prefix;
private final Path oldLogDir;
- private boolean logRollRequested;
+ private boolean logRollRunning;
private static Class<? extends Writer> logWriterClass;
@@ -140,7 +139,10 @@ public class HLog implements Syncable {
}
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
- private int initialReplication; // initial replication factor of SequenceFile.writer
+
+ // Minimum tolerable replicas, if the actual value is lower than it,
+ // rollWriter will be triggered
+ private int minTolerableReplication;
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
@@ -189,9 +191,19 @@ public class HLog implements Syncable {
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
- //number of transactions in the current Hlog.
+ // number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
+ // If live datanode count is lower than the default replicas value,
+ // RollWriter will be triggered in each sync(So the RollWriter will be
+ // triggered one by one in a short time). Using it as a workaround to slow
+ // down the roll frequency triggered by checkLowReplication().
+ private volatile int consecutiveLogRolls = 0;
+ private final int lowReplicationRollLimit;
+ // If consecutiveLogRolls is larger than lowReplicationRollLimit,
+ // then disable the rolling in checkLowReplication().
+ // Enable it if the replications recover.
+ private volatile boolean lowReplicationRollEnabled = true;
// If > than this size, roll the log. This is typically 0.95 times the size
// of the default Hdfs block size.
private final long logrollsize;
@@ -358,6 +370,11 @@ public class HLog implements Syncable {
}
}
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+ this.minTolerableReplication = conf.getInt(
+ "hbase.regionserver.hlog.tolerable.lowreplication",
+ this.fs.getDefaultReplication());
+ this.lowReplicationRollLimit = conf.getInt(
+ "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
LOG.info("HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) +
@@ -470,6 +487,7 @@ public class HLog implements Syncable {
}
byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
+ this.logRollRunning = true;
try {
if (closed) {
return regionsToFlush;
@@ -480,7 +498,6 @@ public class HLog implements Syncable {
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
@@ -500,7 +517,6 @@ public class HLog implements Syncable {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter;
- this.initialReplication = nextInitialReplication;
this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null?
@@ -510,7 +526,6 @@ public class HLog implements Syncable {
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
- this.logRollRequested = false;
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
@@ -528,6 +543,7 @@ public class HLog implements Syncable {
}
}
} finally {
+ this.logRollRunning = false;
this.cacheFlushLock.unlock();
}
return regionsToFlush;
@@ -982,7 +998,7 @@ public class HLog implements Syncable {
synchronized (this.updateLock) {
syncTime += System.currentTimeMillis() - now;
syncOps++;
- if (!logRollRequested) {
+ if (!logRollRunning) {
checkLowReplication();
if (this.writer.getLength() > this.logrollsize) {
requestLogRoll();
@@ -998,18 +1014,43 @@ public class HLog implements Syncable {
}
private void checkLowReplication() {
- // if the number of replicas in HDFS has fallen below the initial
+ // if the number of replicas in HDFS has fallen below the configured
// value, then roll logs.
try {
int numCurrentReplicas = getLogReplication();
- if (numCurrentReplicas != 0 &&
- numCurrentReplicas < this.initialReplication) {
- LOG.warn("HDFS pipeline error detected. " +
- "Found " + numCurrentReplicas + " replicas but expecting " +
- this.initialReplication + " replicas. " +
- " Requesting close of hlog.");
- requestLogRoll();
- logRollRequested = true;
+ if (numCurrentReplicas != 0
+ && numCurrentReplicas < this.minTolerableReplication) {
+ if (this.lowReplicationRollEnabled) {
+ if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
+ LOG.warn("HDFS pipeline error detected. " + "Found "
+ + numCurrentReplicas + " replicas but expecting no less than "
+ + this.minTolerableReplication + " replicas. "
+ + " Requesting close of hlog.");
+ requestLogRoll();
+ // If rollWriter is requested, increase consecutiveLogRolls. Once it
+ // is larger than lowReplicationRollLimit, disable the
+ // LowReplication-Roller
+ this.consecutiveLogRolls++;
+ } else {
+ LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
+ + "the total number of live datanodes is lower than the tolerable replicas.");
+ this.consecutiveLogRolls = 0;
+ this.lowReplicationRollEnabled = false;
+ }
+ }
+ } else if (numCurrentReplicas >= this.minTolerableReplication) {
+ if (!this.lowReplicationRollEnabled) {
+ // The new writer's log replicas is always the default value.
+ // So we should not enable LowReplication-Roller. If numEntries
+ // is lower than or equals 1, we consider it as a new writer.
+ if (this.numEntries.get() <= 1) {
+ return;
+ }
+ // Once the live datanode number and the replicas return to normal,
+ // enable the LowReplication-Roller.
+ this.lowReplicationRollEnabled = true;
+ LOG.info("LowReplication-Roller was enabled.");
+ }
}
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
@@ -1302,6 +1343,15 @@ public class HLog implements Syncable {
}
/**
+ * Get LowReplication-Roller status
+ *
+ * @return lowReplicationRollEnabled
+ */
+ public boolean isLowReplicationRollEnabled() {
+ return lowReplicationRollEnabled;
+ }
+
+ /**
* Get the directory we are making logs in.
*
* @return dir
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1159498&r1=1159497&r2=1159498&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Aug 19 03:59:44 2011
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
@@ -29,23 +31,21 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -55,8 +55,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-
/**
* Test log deletion as logs are rolled.
*/
@@ -137,6 +135,10 @@ public class TestLogRolling {
// the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
+ TEST_UTIL.getConfiguration().setInt(
+ "hbase.regionserver.hlog.tolerable.lowreplication", 2);
+ TEST_UTIL.getConfiguration().setInt(
+ "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
TEST_UTIL.startMiniCluster(2);
cluster = TEST_UTIL.getHBaseCluster();
@@ -224,6 +226,30 @@ public class TestLogRolling {
// continue
}
}
+
+ void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
+ throws IOException {
+ for (int i = 0; i < 10; i++) {
+ Put put = new Put(Bytes.toBytes("row"
+ + String.format("%1$04d", (start + i))));
+ put.add(HConstants.CATALOG_FAMILY, null, value);
+ table.put(put);
+ }
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ while (remaining > 0) {
+ if (log.isLowReplicationRollEnabled() == expect) {
+ break;
+ } else {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ remaining = timeout - (System.currentTimeMillis() - startTime);
+ }
+ }
+ }
/**
* Give me the HDFS pipeline for this log file
@@ -267,7 +293,7 @@ public class TestLogRolling {
this.server = cluster.getRegionServer(0);
this.log = server.getWAL();
-
+
// Create the test table and open it
String tableName = getName();
HTableDescriptor desc = new HTableDescriptor(tableName);
@@ -320,7 +346,23 @@ public class TestLogRolling {
// write some more log data (this should use a new hdfs_out)
writeData(table, 3);
assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
- assertTrue("New log file should have the default replication", log
- .getLogReplication() == fs.getDefaultReplication());
+ // kill another datanode in the pipeline, so the replicas will be lower than
+ // the configured value 2.
+ assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
+ Thread.sleep(10000);
+ batchWriteAndWait(table, 4, false, 10000);
+ assertTrue("LowReplication Roller should've been disabled",
+ !log.isLowReplicationRollEnabled());
+ dfsCluster
+ .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
+ dfsCluster.waitActive();
+ // Force roll writer. The new log file will have the defalt replication,
+ // and the lowReplication roller will be enabled.
+ log.rollWriter();
+ batchWriteAndWait(table, 14, true, 10000);
+ assertTrue("LowReplication Roller should've been enabled",
+ log.isLowReplicationRollEnabled());
+ assertTrue("New log file should have the default replication",
+ log.getLogReplication() == fs.getDefaultReplication());
}
}