You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/04/18 22:37:26 UTC
svn commit: r1327672 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Author: larsh
Date: Wed Apr 18 20:37:25 2012
New Revision: 1327672
URL: http://svn.apache.org/viewvc?rev=1327672&view=rev
Log:
HBASE-5782 Edits can be appended out of seqid order since HBASE-4487
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1327672&r1=1327671&r2=1327672&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Apr 18 20:37:25 2012
@@ -228,6 +228,7 @@ public class HLog implements Syncable {
// during an update
// locked during appends
private final Object updateLock = new Object();
+ private final Object flushLock = new Object();
private final boolean enabled;
@@ -295,7 +296,6 @@ public class HLog implements Syncable {
private static Metric writeSize = new Metric();
// For measuring latency of syncs
private static Metric syncTime = new Metric();
- private static AtomicLong syncBatchSize = new AtomicLong();
//For measuring slow HLog appends
private static AtomicLong slowHLogAppendCount = new AtomicLong();
private static Metric slowHLogAppendTime = new Metric();
@@ -312,10 +312,6 @@ public class HLog implements Syncable {
return syncTime.get();
}
- public static long getSyncBatchSize() {
- return syncBatchSize.getAndSet(0);
- }
-
public static long getSlowAppendCount() {
return slowHLogAppendCount.get();
}
@@ -1257,32 +1253,43 @@ public class HLog implements Syncable {
return;
}
try {
- long doneUpto = this.unflushedEntries.get();
+ long doneUpto;
long now = System.currentTimeMillis();
- // Done in parallel for all writer threads, thanks to HDFS-895
- List<Entry> pending = logSyncerThread.getPendingWrites();
+ // First flush all the pending writes to HDFS. Then
+ // issue the sync to HDFS. If sync is successful, then update
+ // syncedTillHere to indicate that transactions till this
+ // number has been successfully synced.
+ synchronized (flushLock) {
+ if (txid <= this.syncedTillHere) {
+ return;
+ }
+ doneUpto = this.unflushedEntries.get();
+ List<Entry> pending = logSyncerThread.getPendingWrites();
+ try {
+ logSyncerThread.hlogFlush(tempWriter, pending);
+ } catch(IOException io) {
+ synchronized (this.updateLock) {
+ // HBASE-4387, HBASE-5623, retry with updateLock held
+ tempWriter = this.writer;
+ logSyncerThread.hlogFlush(tempWriter, pending);
+ }
+ }
+ }
+ // another thread might have sync'ed avoid double-sync'ing
+ if (txid <= this.syncedTillHere) {
+ return;
+ }
try {
- // First flush all the pending writes to HDFS. Then
- // issue the sync to HDFS. If sync is successful, then update
- // syncedTillHere to indicate that transactions till this
- // number has been successfully synced.
- logSyncerThread.hlogFlush(tempWriter, pending);
- pending = null;
tempWriter.sync();
- syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
- this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
} catch(IOException io) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
- logSyncerThread.hlogFlush(tempWriter, pending);
tempWriter.sync();
- syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
- this.syncedTillHere = doneUpto;
}
}
- // We try to not acquire the updateLock just to update statistics.
- // Make these statistics as AtomicLong.
+ this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
+
syncTime.inc(System.currentTimeMillis() - now);
if (!this.logRollRunning) {
checkLowReplication();
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1327672&r1=1327671&r2=1327672&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Apr 18 20:37:25 2012
@@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.SequenceFile;
import org.apache.log4j.Level;
@@ -137,6 +136,19 @@ public class TestHLog {
}
/**
+ * Test that with three concurrent threads we still write edits in sequence
+ * edit id order.
+ * @throws Exception
+ */
+ @Test
+ public void testMaintainOrderWithConcurrentWrites() throws Exception {
+ // Run the HPE tool with three threads writing 3000 edits each concurrently.
+ // When done, verify that all edits were written and that the order in the
+ // WALs is of ascending edit sequence ids.
+ HLogPerformanceEvaluation.main(new String [] {"-threads", "3", "-verify", "-iterations", "3000"});
+ }
+
+ /**
* Just write multiple logs then split. Before fix for HADOOP-2283, this
* would fail.
* @throws IOException