You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/10/19 02:41:42 UTC
[1/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
Repository: hbase
Updated Branches:
refs/heads/branch-1 66941910b -> 019c7f930
refs/heads/branch-1.1 382f88ae8 -> 4e304b3f9
refs/heads/branch-1.2 bcc74e5ee -> 571814425
refs/heads/branch-1.3 d38310aa4 -> c51722629
refs/heads/master 6c89c6251 -> ef8c65e54
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef8c65e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef8c65e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef8c65e5
Branch: refs/heads/master
Commit: ef8c65e54201b37edfb9a8f4f4d24137544b8ec1
Parents: 6c89c62
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 18:46:02 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 48 +++++++++++++++-----
.../wal/TestLogRollingNoCluster.java | 42 +++++++++++------
2 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 3e0e829..142ab63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -30,15 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -68,6 +59,15 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
/**
* The default implementation of FSWAL.
*/
@@ -499,6 +499,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -546,6 +547,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
if (!syncFuture.done(currentSequence, t)) {
throw new IllegalStateException();
}
+
// This function releases one sync future only.
return 1;
}
@@ -589,13 +591,21 @@ public class FSHLog extends AbstractFSWAL<Writer> {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -975,11 +985,23 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) {
return true;
}
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -1095,11 +1117,13 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked()
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
&& highestSyncedTxid.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index eda7df7..7412128 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
-
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -56,7 +56,18 @@ public class TestLogRollingNoCluster {
withLookingForStuckThread(true).build();
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -65,38 +76,42 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{}, null);
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -149,6 +164,7 @@ public class TestLogRollingNoCluster {
}
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";
[3/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5172262
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5172262
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5172262
Branch: refs/heads/branch-1.3
Commit: c51722629418b8b5e3a6e688219ee7d806f251c7
Parents: d38310a
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:16:31 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++--
.../wal/TestLogRollingNoCluster.java | 43 ++++++++++++++------
2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 097101b..9993d62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1131,6 +1131,7 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -1220,13 +1221,21 @@ public class FSHLog implements WAL {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -1737,9 +1746,21 @@ public class FSHLog implements WAL {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -1850,11 +1871,13 @@ public class FSHLog implements WAL {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked() &&
- highestSyncedSequence.get() < currentSequence &&
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
+ && highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 7ce3615..bca4a7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{}, null);
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc), edit, true);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";
[4/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/57181442
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/57181442
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/57181442
Branch: refs/heads/branch-1.2
Commit: 57181442577c36689114334b011a6e72de4ae785
Parents: bcc74e5
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:19:12 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++--
.../wal/TestLogRollingNoCluster.java | 43 ++++++++++++++------
2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 79ff1bc..7e3d82b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1127,6 +1127,7 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -1216,13 +1217,21 @@ public class FSHLog implements WAL {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -1733,9 +1742,21 @@ public class FSHLog implements WAL {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -1846,11 +1867,13 @@ public class FSHLog implements WAL {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked() &&
- highestSyncedSequence.get() < currentSequence &&
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
+ && highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 1c36552..034ddcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{});
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc), edit, true);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";
[5/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e304b3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e304b3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e304b3f
Branch: refs/heads/branch-1.1
Commit: 4e304b3f919a9000e15fd66df190ab97e63bc07d
Parents: 382f88a
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:41:04 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++--
.../wal/TestLogRollingNoCluster.java | 44 +++++++++++++-------
2 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 53545ed..76d09c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1278,6 +1278,7 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -1367,13 +1368,21 @@ public class FSHLog implements WAL {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -2010,9 +2019,21 @@ public class FSHLog implements WAL {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -2123,11 +2144,13 @@ public class FSHLog implements WAL {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked() &&
- highestSyncedSequence.get() < currentSequence &&
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
+ && highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 8727e23..722c218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -50,7 +51,18 @@ import org.junit.experimental.categories.Category;
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -59,38 +71,41 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{});
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -139,6 +154,7 @@ public class TestLogRollingNoCluster {
final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now), edit, sequenceId, true, null);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";
[2/5] hbase git commit: HBASE-16824 Writer.flush() can be called on
already closed streams in WAL roll
Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/019c7f93
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/019c7f93
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/019c7f93
Branch: refs/heads/branch-1
Commit: 019c7f9303a7242b7c5d6713bed414b180b5c84a
Parents: 6694191
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:14:20 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++--
.../wal/TestLogRollingNoCluster.java | 43 ++++++++++++++------
2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/019c7f93/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 11ebfef..a8b0372 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1138,6 +1138,7 @@ public class FSHLog implements WAL {
private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures;
+ private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
@@ -1227,13 +1228,21 @@ public class FSHLog implements WAL {
return sequence;
}
+ boolean areSyncFuturesReleased() {
+ // check whether there is no sync futures offered, and no in-flight sync futures that is being
+ // processed.
+ return syncFutures.size() <= 0
+ && takeSyncFuture == null;
+ }
+
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
- SyncFuture takeSyncFuture;
+
try {
while (true) {
+ takeSyncFuture = null;
// We have to process what we 'take' from the queue
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
@@ -1744,9 +1753,21 @@ public class FSHLog implements WAL {
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
+ // Look at SyncFutures in the EventHandler
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
+
+ return false;
+ }
+
+ private boolean isOutstandingSyncsFromRunners() {
+ // Look at SyncFutures in the SyncRunners
+ for (SyncRunner syncRunner: syncRunners) {
+ if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+ return true;
+ }
+ }
return false;
}
@@ -1857,11 +1878,13 @@ public class FSHLog implements WAL {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
- while (!this.shutdown && this.zigzagLatch.isCocked() &&
- highestSyncedSequence.get() < currentSequence &&
+ while ((!this.shutdown && this.zigzagLatch.isCocked()
+ && highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
- isOutstandingSyncs()) {
+ isOutstandingSyncs())
+ // Wait for all SyncRunners to finish their work so that we can replace the writer
+ || isOutstandingSyncsFromRunners()) {
synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/019c7f93/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 7ce3615..bca4a7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
- private static final int THREAD_COUNT = 100; // Spin up this many threads
+ private static final int NUM_THREADS = 100; // Spin up this many threads
+ private static final int NUM_ENTRIES = 100; // How many entries to write
+
+ /** ProtobufLogWriter that simulates higher latencies in sync() call */
+ public static class HighLatencySyncWriter extends ProtobufLogWriter {
+ @Override
+ public void sync() throws IOException {
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ super.sync();
+ Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+ }
+ }
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
* @throws InterruptedException
*/
@Test
- public void testContendedLogRolling() throws IOException, InterruptedException {
- Path dir = TEST_UTIL.getDataTestDir();
+ public void testContendedLogRolling() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(3);
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
// The implementation needs to know the 'handler' count.
- TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(conf, dir);
+ conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
final WAL wal = wals.getWAL(new byte[]{}, null);
Appender [] appenders = null;
- final int count = THREAD_COUNT;
- appenders = new Appender[count];
+ final int numThreads = NUM_THREADS;
+ appenders = new Appender[numThreads];
try {
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
// Have each appending thread write 'count' entries
- appenders[i] = new Appender(wal, i, count);
+ appenders[i] = new Appender(wal, i, NUM_ENTRIES);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
appenders[i].start();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wals.close();
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < numThreads; i++) {
assertFalse(appenders[i].isException());
}
+ TEST_UTIL.shutdownMiniDFSCluster();
}
/**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc), edit, true);
+ Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);
}
String msg = getName() + " finished";