You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/10/19 02:41:46 UTC
[5/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e304b3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e304b3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e304b3f
Branch: refs/heads/branch-1.1
Commit: 4e304b3f919a9000e15fd66df190ab97e63bc07d
Parents: 382f88a
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:41:04 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++--
.../wal/TestLogRollingNoCluster.java | 44 +++++++++++++-------
2 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 53545ed..76d09c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1278,6 +1278,7 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -1367,13 +1368,21 @@ public class FSHLog implements WAL {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -2010,9 +2019,21 @@ public class FSHLog implements WAL {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -2123,11 +2144,13 @@ public class FSHLog implements WAL {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked() &&
- highestSyncedSequence.get() < currentSequence &&
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
+ && highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 8727e23..722c218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -50,7 +51,18 @@ import org.junit.experimental.categories.Category;
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -59,38 +71,41 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{});
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -139,6 +154,7 @@ public class TestLogRollingNoCluster {
final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now), edit, sequenceId, true, null);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";