You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/07/19 07:15:05 UTC
svn commit: r1148170 - in /hbase/branches/0.90: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Author: tedyu
Date: Tue Jul 19 05:15:03 2011
New Revision: 1148170
URL: http://svn.apache.org/viewvc?rev=1148170&view=rev
Log:
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll is blocked
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1148170&r1=1148169&r2=1148170&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Tue Jul 19 05:15:03 2011
@@ -71,6 +71,8 @@ Release 0.90.4 - Unreleased
HBASE-4052 Enabling a table after master switch does not allow table scan,
throwing NotServingRegionException (ramkrishna via Ted Yu)
HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu)
+ HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request
+ of LogRoll is blocked (Jieshan via Ted Yu)
IMPROVEMENT
HBASE-3882 hbase-config.sh needs to be updated so it can auto-detects the
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1148170&r1=1148169&r2=1148170&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Jul 19 05:15:03 2011
@@ -39,6 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -199,7 +200,11 @@ public class HLog implements Syncable {
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock();
-
+
+ // The waiting time for log-roller trying to get the lock of cacheFlushLock.
+ // If the actual waiting time is longer than it, skip the current log roll.
+ private final long cacheFlushLockWaitTime;
+
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
// locked during appends
@@ -345,6 +350,8 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
+ this.cacheFlushLockWaitTime =
+ conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 5000);
if (failIfLogDirExists && fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
@@ -469,66 +476,87 @@ public class HLog implements Syncable {
return null;
}
byte [][] regionsToFlush = null;
- this.cacheFlushLock.lock();
try {
- if (closed) {
- return regionsToFlush;
- }
- // Do all the preparation outside of the updateLock to block
- // as less as possible the incoming writes
- long currentFilenum = this.filenum;
- this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
- HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
- OutputStream nextHdfsOut = null;
- if (nextWriter instanceof SequenceFileLogWriter) {
- nextHdfsOut =
- ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
- }
- // Tell our listeners that a new log was created
- if (!this.listeners.isEmpty()) {
- for (WALObserver i : this.listeners) {
- i.logRolled(newPath);
- }
- }
-
- synchronized (updateLock) {
- // Clean up current writer.
- Path oldFile = cleanupCurrentWriter(currentFilenum);
- this.writer = nextWriter;
- this.initialReplication = nextInitialReplication;
- this.hdfs_out = nextHdfsOut;
-
- LOG.info((oldFile != null?
- "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
- this.numEntries.get() +
- ", filesize=" +
- this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
- "New hlog " + FSUtils.getPath(newPath));
- this.numEntries.set(0);
- this.logRollRequested = false;
- }
- // Can we delete any of the old log files?
- if (this.outputfiles.size() > 0) {
- if (this.lastSeqWritten.isEmpty()) {
- LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
- // If so, then no new writes have come in since all regions were
- // flushed (and removed from the lastSeqWritten map). Means can
- // remove all but currently open log file.
- for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
- archiveLogFile(e.getValue(), e.getKey());
+ if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime,
+ TimeUnit.MILLISECONDS)) {
+ try {
+ if (closed) {
+ return regionsToFlush;
+ }
+ this.logRollRequested = true;
+ // Do all the preparation outside of the updateLock to block
+ // as less as possible the incoming writes
+ long currentFilenum = this.filenum;
+ this.filenum = System.currentTimeMillis();
+ Path newPath = computeFilename();
+ HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+
+ //This method get expect but not the actual replicas of the Hlog file
+ int nextExpectReplicas = fs.getFileStatus(newPath).getReplication();
+
+ //Get the current replicas of the Hlog file
+ int nextActualReplicas = -1;
+ try
+ {
+ nextActualReplicas = getLogReplication();
+ } catch (Exception e) {
+ LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+ " still proceeding ahead...");
+ }
+ // Can we get at the dfsclient outputstream? If an instance of
+ // SFLW, it'll have done the necessary reflection to get at the
+ // protected field name.
+ OutputStream nextHdfsOut = null;
+ if (nextWriter instanceof SequenceFileLogWriter) {
+ nextHdfsOut =
+ ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
+ }
+ // Tell our listeners that a new log was created
+ if (!this.listeners.isEmpty()) {
+ for (WALObserver i : this.listeners) {
+ i.logRolled(newPath);
+ }
}
- this.outputfiles.clear();
- } else {
- regionsToFlush = cleanOldLogs();
+
+ synchronized (updateLock) {
+ // Clean up current writer.
+ Path oldFile = cleanupCurrentWriter(currentFilenum);
+ this.writer = nextWriter;
+ this.initialReplication = nextActualReplicas == -1 ?
+ nextExpectReplicas : nextActualReplicas;
+ this.hdfs_out = nextHdfsOut;
+
+ LOG.info((oldFile != null?
+ "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+ this.numEntries.get() +
+ ", filesize=" +
+ this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+ "New hlog " + FSUtils.getPath(newPath));
+ this.numEntries.set(0);
+ this.logRollRequested = false;
+ }
+ // Can we delete any of the old log files?
+ if (this.outputfiles.size() > 0) {
+ if (this.lastSeqWritten.isEmpty()) {
+ LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+ // If so, then no new writes have come in since all regions were
+ // flushed (and removed from the lastSeqWritten map). Means can
+ // remove all but currently open log file.
+ for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+ archiveLogFile(e.getValue(), e.getKey());
+ }
+ this.outputfiles.clear();
+ } else {
+ regionsToFlush = cleanOldLogs();
+ }
+ }
+ } finally {
+ this.cacheFlushLock.unlock();
}
}
- } finally {
- this.cacheFlushLock.unlock();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted rollWriter", e);
+ Thread.currentThread().interrupt();
}
return regionsToFlush;
}
@@ -1009,7 +1037,6 @@ public class HLog implements Syncable {
this.initialReplication + " replicas. " +
" Requesting close of hlog.");
requestLogRoll();
- logRollRequested = true;
}
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +