You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@distributedlog.apache.org by si...@apache.org on 2016/08/16 19:02:11 UTC

incubator-distributedlog git commit: DL-21: Fix DL flaky test cases

Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 643963ce7 -> a5dd5adce


DL-21: Fix DL flaky test cases

DL-21: Fixed a few DL flaky test cases.

Author: Yiming Zang <yz...@twitter.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #5 from yzang/yzang/fix_flaky_test


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a5dd5adc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a5dd5adc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a5dd5adc

Branch: refs/heads/master
Commit: a5dd5adce87bee8bbce9faefc25f29c5b8b4d7df
Parents: 643963c
Author: Yiming Zang <yz...@twitter.com>
Authored: Tue Aug 16 12:02:06 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Aug 16 12:02:06 2016 -0700

----------------------------------------------------------------------
 .../distributedlog/BKDistributedLogManager.java |  2 ++
 .../distributedlog/TestAsyncReaderWriter.java   |  5 ++-
 .../distributedlog/TestNonBlockingReads.java    | 38 ++++++++++++++------
 .../config/TestConfigurationSubscription.java   | 16 ++++++++-
 ...TestDynamicConfigurationFeatureProvider.java | 16 +++++++++
 .../service/TestDistributedLogService.java      | 10 +++---
 6 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index fd8ec2d..9c19381 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -969,6 +969,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 false,
                 dynConf.getDeserializeRecordSetOnReads(),
                 statsLogger);
+        pendingReaders.add(reader);
         return Future.value(reader);
     }
 
@@ -1095,6 +1096,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 true,
                 dynConf.getDeserializeRecordSetOnReads(),
                 statsLogger);
+        pendingReaders.add(asyncReader);
         return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index a6a89ba..06cf079 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -885,8 +885,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         }
     }
 
+    /**
+     * Flaky test fixed: readers need to be added to the pendingReaders
+     * @throws Exception
+     */
     @Test(timeout = 300000)
-    @DistributedLogAnnotations.FlakyTest
     public void testSimpleAsyncReadWriteSimulateErrors() throws Exception {
         String name = runtime.getMethodName();
         DistributedLogConfiguration confLocal = new DistributedLogConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
index 90a33e8..58863c4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -33,13 +34,12 @@ import static org.junit.Assert.*;
 public class TestNonBlockingReads extends TestDistributedLogBase {
     static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class);
 
-    // TODO: investigate why long poll read makes test flaky
     static {
         conf.setOutputBufferSize(0);
         conf.setImmediateFlushEnabled(true);
     }
 
-    @Test(timeout = 60000)
+    @Test(timeout = 100000)
     public void testNonBlockingRead() throws Exception {
         String name = "distrlog-non-blocking-reader";
         final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -49,9 +49,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.setReaderIdleWarnThresholdMillis(100);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        ScheduledFuture writerClosedFuture = null;
         try {
             final Thread currentThread = Thread.currentThread();
-            executor.schedule(
+            writerClosedFuture = executor.schedule(
                     new Runnable() {
                         @Override
                         public void run() {
@@ -67,12 +68,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             readNonBlocking(dlm, false);
             assertFalse(currentThread.isInterrupted());
         } finally {
+            if (writerClosedFuture != null){
+                // ensure writer.closeAndComplete is done before we close dlm
+                writerClosedFuture.get();
+            }
             executor.shutdown();
             dlm.close();
         }
     }
 
-    @Test(timeout = 60000)
+    @Test(timeout = 100000)
     public void testNonBlockingReadRecovery() throws Exception {
         String name = "distrlog-non-blocking-reader-recovery";
         final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -81,9 +86,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.setReadAheadMaxRecords(10);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        ScheduledFuture writerClosedFuture = null;
         try {
             final Thread currentThread = Thread.currentThread();
-            executor.schedule(
+            writerClosedFuture = executor.schedule(
                     new Runnable() {
                         @Override
                         public void run() {
@@ -100,12 +106,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             readNonBlocking(dlm, false);
             assertFalse(currentThread.isInterrupted());
         } finally {
+            if (writerClosedFuture != null){
+                // ensure writer.closeAndComplete is done before we close dlm
+                writerClosedFuture.get();
+            }
             executor.shutdown();
             dlm.close();
         }
     }
 
-    @Test(timeout = 60000)
+    @Test(timeout = 100000)
     public void testNonBlockingReadIdleError() throws Exception {
         String name = "distrlog-non-blocking-reader-error";
         final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
@@ -116,10 +126,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.setReaderIdleErrorThresholdMillis(100);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-
+        ScheduledFuture writerClosedFuture = null;
         try {
             final Thread currentThread = Thread.currentThread();
-            executor.schedule(
+            writerClosedFuture = executor.schedule(
                     new Runnable() {
                         @Override
                         public void run() {
@@ -141,6 +151,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             assertTrue(exceptionEncountered);
             assertFalse(currentThread.isInterrupted());
         } finally {
+            if (writerClosedFuture != null){
+                // ensure writer.closeAndComplete is done before we close dlm
+                writerClosedFuture.get();
+            }
             executor.shutdown();
             dlm.close();
         }
@@ -157,10 +171,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.setReaderIdleErrorThresholdMillis(30000);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-
+        ScheduledFuture writerClosedFuture = null;
         try {
             final Thread currentThread = Thread.currentThread();
-            executor.schedule(
+            writerClosedFuture = executor.schedule(
                     new Runnable() {
                         @Override
                         public void run() {
@@ -183,6 +197,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
             assertFalse(exceptionEncountered);
             assertFalse(currentThread.isInterrupted());
         } finally {
+            if (writerClosedFuture != null){
+                // ensure writer.closeAndComplete is done before we close dlm
+                writerClosedFuture.get();
+            }
             executor.shutdown();
             dlm.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
index 278bf29..24733a4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java
@@ -42,6 +42,17 @@ import static org.junit.Assert.*;
 public class TestConfigurationSubscription {
     static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
 
+    /**
+     * Give FileChangedReloadingStrategy some time to start reloading
+     * Make sure now!=lastChecked
+     * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+     */
+    private void ensureConfigReloaded() throws InterruptedException {
+        // sleep 1 ms so that System.currentTimeMillis() !=
+        // lastChecked (the time we construct FileChangedReloadingStrategy
+        Thread.sleep(1);
+    }
+
     @Test(timeout = 60000)
     public void testReloadConfiguration() throws Exception {
         PropertiesWriter writer = new PropertiesWriter();
@@ -63,7 +74,8 @@ public class TestConfigurationSubscription {
         // add
         writer.setProperty("prop1", "1");
         writer.save();
-
+        // ensure the file change reloading event can be triggered
+        ensureConfigReloaded();
         // reload the config
         confSub.reload();
         assertNotNull(confHolder.get());
@@ -85,6 +97,8 @@ public class TestConfigurationSubscription {
         // add
         writer.setProperty("prop1", "1");
         writer.save();
+        // ensure the file change reloading event can be triggered
+        ensureConfigReloaded();
         mockScheduler.tick(100, TimeUnit.MILLISECONDS);
         assertEquals("1", conf.getProperty("prop1"));
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 35fac65..46c1880 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -30,6 +30,19 @@ import static org.junit.Assert.*;
  */
 public class TestDynamicConfigurationFeatureProvider {
 
+    /**
+     * Make sure config is reloaded
+     *
+     * Give FileChangedReloadingStrategy some time to allow reloading
+     * Make sure now!=lastChecked
+     * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+     */
+    private void ensureConfigReloaded() throws InterruptedException {
+        // sleep 1 ms so that System.currentTimeMillis() !=
+        // lastChecked (the time we construct FileChangedReloadingStrategy
+        Thread.sleep(1);
+    }
+
     @Test(timeout = 60000)
     public void testLoadFeaturesFromBase() throws Exception {
         PropertiesWriter writer = new PropertiesWriter();
@@ -43,6 +56,7 @@ public class TestDynamicConfigurationFeatureProvider {
         DynamicConfigurationFeatureProvider provider =
                 new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
         provider.start();
+        ensureConfigReloaded();
 
         Feature feature1 = provider.getFeature("feature_1");
         assertTrue(feature1.isAvailable());
@@ -79,6 +93,7 @@ public class TestDynamicConfigurationFeatureProvider {
         DynamicConfigurationFeatureProvider provider =
                 new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
         provider.start();
+        ensureConfigReloaded();
 
         Feature feature1 = provider.getFeature("feature_1");
         assertTrue(feature1.isAvailable());
@@ -118,6 +133,7 @@ public class TestDynamicConfigurationFeatureProvider {
         DynamicConfigurationFeatureProvider provider =
                 new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
         provider.start();
+        ensureConfigReloaded();
 
         Feature feature1 = provider.getFeature("feature_1");
         assertTrue(feature1.isAvailable());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 61fb808..ed456b9 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -742,12 +742,14 @@ public class TestDistributedLogService extends TestDistributedLogBase {
                     StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
                     StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
         }
-        assertTrue("There should be no streams in the cache",
-                streamManager.getCachedStreams().isEmpty());
+        // acquired streams should all been removed after we close them
         assertTrue("There should be no streams in the acquired cache",
-                streamManager.getAcquiredStreams().isEmpty());
-
+            streamManager.getAcquiredStreams().isEmpty());
         localService.shutdown();
+        // cached streams wouldn't be removed immediately after streams are closed
+        // but they should be removed after we shutdown the service
+        assertTrue("There should be no streams in the cache after shutting down the service",
+            streamManager.getCachedStreams().isEmpty());
     }
 
     @Test(timeout = 60000)