You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@distributedlog.apache.org by si...@apache.org on 2016/08/16 19:02:11 UTC
incubator-distributedlog git commit: DL-21: Fix DL flaky test cases
Repository: incubator-distributedlog
Updated Branches:
refs/heads/master 643963ce7 -> a5dd5adce
DL-21: Fix DL flaky test cases
DL-21: Fixed a few DL flaky test cases.
Author: Yiming Zang <yz...@twitter.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #5 from yzang/yzang/fix_flaky_test
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a5dd5adc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a5dd5adc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a5dd5adc
Branch: refs/heads/master
Commit: a5dd5adce87bee8bbce9faefc25f29c5b8b4d7df
Parents: 643963c
Author: Yiming Zang <yz...@twitter.com>
Authored: Tue Aug 16 12:02:06 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Aug 16 12:02:06 2016 -0700
----------------------------------------------------------------------
.../distributedlog/BKDistributedLogManager.java | 2 ++
.../distributedlog/TestAsyncReaderWriter.java | 5 ++-
.../distributedlog/TestNonBlockingReads.java | 38 ++++++++++++++------
.../config/TestConfigurationSubscription.java | 16 ++++++++-
...TestDynamicConfigurationFeatureProvider.java | 16 +++++++++
.../service/TestDistributedLogService.java | 10 +++---
6 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index fd8ec2d..9c19381 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -969,6 +969,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
false,
dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
+ pendingReaders.add(reader);
return Future.value(reader);
}
@@ -1095,6 +1096,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
true,
dynConf.getDeserializeRecordSetOnReads(),
statsLogger);
+ pendingReaders.add(asyncReader);
return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index a6a89ba..06cf079 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -885,8 +885,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}
}
+ /**
+ * Flaky test fixed: readers need to be added to the pendingReaders
+ * @throws Exception
+ */
@Test(timeout = 300000)
- @DistributedLogAnnotations.FlakyTest
public void testSimpleAsyncReadWriteSimulateErrors() throws Exception {
String name = runtime.getMethodName();
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
index 90a33e8..58863c4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
@@ -17,6 +17,7 @@
*/
package com.twitter.distributedlog;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,13 +34,12 @@ import static org.junit.Assert.*;
public class TestNonBlockingReads extends TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class);
- // TODO: investigate why long poll read makes test flaky
static {
conf.setOutputBufferSize(0);
conf.setImmediateFlushEnabled(true);
}
- @Test(timeout = 60000)
+ @Test(timeout = 100000)
public void testNonBlockingRead() throws Exception {
String name = "distrlog-non-blocking-reader";
final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -49,9 +49,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
confLocal.setReaderIdleWarnThresholdMillis(100);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
- executor.schedule(
+ writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
@@ -67,12 +68,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
readNonBlocking(dlm, false);
assertFalse(currentThread.isInterrupted());
} finally {
+ if (writerClosedFuture != null){
+ // ensure writer.closeAndComplete is done before we close dlm
+ writerClosedFuture.get();
+ }
executor.shutdown();
dlm.close();
}
}
- @Test(timeout = 60000)
+ @Test(timeout = 100000)
public void testNonBlockingReadRecovery() throws Exception {
String name = "distrlog-non-blocking-reader-recovery";
final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -81,9 +86,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
confLocal.setReadAheadMaxRecords(10);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
- executor.schedule(
+ writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
@@ -100,12 +106,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
readNonBlocking(dlm, false);
assertFalse(currentThread.isInterrupted());
} finally {
+ if (writerClosedFuture != null){
+ // ensure writer.closeAndComplete is done before we close dlm
+ writerClosedFuture.get();
+ }
executor.shutdown();
dlm.close();
}
}
- @Test(timeout = 60000)
+ @Test(timeout = 100000)
public void testNonBlockingReadIdleError() throws Exception {
String name = "distrlog-non-blocking-reader-error";
final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -116,10 +126,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
confLocal.setReaderIdleErrorThresholdMillis(100);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-
+ ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
- executor.schedule(
+ writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
@@ -141,6 +151,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
assertTrue(exceptionEncountered);
assertFalse(currentThread.isInterrupted());
} finally {
+ if (writerClosedFuture != null){
+ // ensure writer.closeAndComplete is done before we close dlm
+ writerClosedFuture.get();
+ }
executor.shutdown();
dlm.close();
}
@@ -157,10 +171,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
confLocal.setReaderIdleErrorThresholdMillis(30000);
final DistributedLogManager dlm = createNewDLM(confLocal, name);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-
+ ScheduledFuture writerClosedFuture = null;
try {
final Thread currentThread = Thread.currentThread();
- executor.schedule(
+ writerClosedFuture = executor.schedule(
new Runnable() {
@Override
public void run() {
@@ -183,6 +197,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
assertFalse(exceptionEncountered);
assertFalse(currentThread.isInterrupted());
} finally {
+ if (writerClosedFuture != null){
+ // ensure writer.closeAndComplete is done before we close dlm
+ writerClosedFuture.get();
+ }
executor.shutdown();
dlm.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
index 278bf29..24733a4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
@@ -42,6 +42,17 @@ import static org.junit.Assert.*;
public class TestConfigurationSubscription {
static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
+ /**
+ * Give FileChangedReloadingStrategy some time to start reloading
+ * Make sure now!=lastChecked
+ * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+ */
+ private void ensureConfigReloaded() throws InterruptedException {
+ // sleep 1 ms so that System.currentTimeMillis() !=
+ // lastChecked (the time we construct FileChangedReloadingStrategy
+ Thread.sleep(1);
+ }
+
@Test(timeout = 60000)
public void testReloadConfiguration() throws Exception {
PropertiesWriter writer = new PropertiesWriter();
@@ -63,7 +74,8 @@ public class TestConfigurationSubscription {
// add
writer.setProperty("prop1", "1");
writer.save();
-
+ // ensure the file change reloading event can be triggered
+ ensureConfigReloaded();
// reload the config
confSub.reload();
assertNotNull(confHolder.get());
@@ -85,6 +97,8 @@ public class TestConfigurationSubscription {
// add
writer.setProperty("prop1", "1");
writer.save();
+ // ensure the file change reloading event can be triggered
+ ensureConfigReloaded();
mockScheduler.tick(100, TimeUnit.MILLISECONDS);
assertEquals("1", conf.getProperty("prop1"));
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 35fac65..46c1880 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -30,6 +30,19 @@ import static org.junit.Assert.*;
*/
public class TestDynamicConfigurationFeatureProvider {
+ /**
+ * Make sure config is reloaded
+ *
+ * Give FileChangedReloadingStrategy some time to allow reloading
+ * Make sure now!=lastChecked
+ * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+ */
+ private void ensureConfigReloaded() throws InterruptedException {
+ // sleep 1 ms so that System.currentTimeMillis() !=
+ // lastChecked (the time we construct FileChangedReloadingStrategy
+ Thread.sleep(1);
+ }
+
@Test(timeout = 60000)
public void testLoadFeaturesFromBase() throws Exception {
PropertiesWriter writer = new PropertiesWriter();
@@ -43,6 +56,7 @@ public class TestDynamicConfigurationFeatureProvider {
DynamicConfigurationFeatureProvider provider =
new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
provider.start();
+ ensureConfigReloaded();
Feature feature1 = provider.getFeature("feature_1");
assertTrue(feature1.isAvailable());
@@ -79,6 +93,7 @@ public class TestDynamicConfigurationFeatureProvider {
DynamicConfigurationFeatureProvider provider =
new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
provider.start();
+ ensureConfigReloaded();
Feature feature1 = provider.getFeature("feature_1");
assertTrue(feature1.isAvailable());
@@ -118,6 +133,7 @@ public class TestDynamicConfigurationFeatureProvider {
DynamicConfigurationFeatureProvider provider =
new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
provider.start();
+ ensureConfigReloaded();
Feature feature1 = provider.getFeature("feature_1");
assertTrue(feature1.isAvailable());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 61fb808..ed456b9 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -742,12 +742,14 @@ public class TestDistributedLogService extends TestDistributedLogBase {
StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
}
- assertTrue("There should be no streams in the cache",
- streamManager.getCachedStreams().isEmpty());
+ // acquired streams should all been removed after we close them
assertTrue("There should be no streams in the acquired cache",
- streamManager.getAcquiredStreams().isEmpty());
-
+ streamManager.getAcquiredStreams().isEmpty());
localService.shutdown();
+ // cached streams wouldn't be removed immediately after streams are closed
+ // but they should be removed after we shutdown the service
+ assertTrue("There should be no streams in the cache after shutting down the service",
+ streamManager.getCachedStreams().isEmpty());
}
@Test(timeout = 60000)