You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by qi...@apache.org on 2015/08/06 19:11:24 UTC

[13/19] incubator-geode git commit: GEODE-155: - Fixes intermittent failure in RegionWithHDFS*DUnitTest about incorrect number of hoplog files getting created. - Caused due to an earlier test not cleaning up static variables in ParallelGatewaySenderQueue

GEODE-155:
- Fixes intermittent failure in RegionWithHDFS*DUnitTest about incorrect number of hoplog files getting created.
- Caused due to an earlier test not cleaning up static variables in ParallelGatewaySenderQueue.
- These are indeed cleaned up during GemFireCacheImpl.close() if there are any GatewaySenders running in the system.
- But if a region with gateway-senders associated with it, is destroyed first before a cache.close(), then the gateway senders are only stopped,
    and removed from allGatewaySenders list. But the static variables are not cleaned up.
- Later, during GemFireCacheImpl.close(), as the allGatewaySenders list is empty, it skips cleaning the static variables.
- As a fix, invoking a static clean-up method of ParallelGatewaySenderQueue explicitly during cache.close().
- Retained the non-static ParallelGatewaySenderQueue.cleanUp(), if required in future development for performing instance-specific clean-up.
- Minor formatting changes in the unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cb735a20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cb735a20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cb735a20

Branch: refs/heads/feature/GEODE-137
Commit: cb735a2045f124ee7c5c6ba84dbcafe4383008d7
Parents: 71f00b5
Author: ashetkar <as...@pivotal.io>
Authored: Tue Aug 4 14:06:51 2015 +0530
Committer: Qihong Chen <qc...@pivotal.io>
Committed: Thu Aug 6 10:07:48 2015 -0700

----------------------------------------------------------------------
 .../internal/cache/GemFireCacheImpl.java        |  1 +
 .../parallel/ParallelGatewaySenderQueue.java    | 41 +++++++++++++-------
 .../internal/RegionWithHDFSBasicDUnitTest.java  | 22 +++++------
 .../RegionWithHDFSOffHeapBasicDUnitTest.java    | 10 +++--
 .../hdfs/internal/RegionWithHDFSTestBase.java   |  6 +--
 5 files changed, 49 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 5487000..f5be144 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -2018,6 +2018,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
               advisor.close();
             }
           }
+          ParallelGatewaySenderQueue.cleanUpStatics(null);
         } catch (CancelException ce) {
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index f4f9528..9141905 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -633,15 +633,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * Wait a while for existing tasks to terminate. If the existing tasks still don't 
    * complete, cancel them by calling shutdownNow. 
    */
-  private void cleanupConflationThreadPool() {
+  private static void cleanupConflationThreadPool(AbstractGatewaySender sender) {
     conflationExecutor.shutdown();// Disable new tasks from being submitted
     
     try {
     if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
       conflationExecutor.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
-      if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS))
-        logger.warn(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL, this.sender));
+        if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+          logger
+              .warn(LocalizedMessage
+                  .create(
+                      LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL,
+                      (sender == null ? "all" : sender)));
+        }
     }
     } catch (InterruptedException e) {
       // (Re-)Cancel if current thread also interrupted
@@ -1508,25 +1513,33 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * by the queue. Note that this cleanup doesn't clean the data held by the queue.
    */
   public void cleanUp() {
-    if(buckToDispatchLock != null){
-      this.buckToDispatchLock = null;
+    cleanUpStatics(this.sender);
+  }
+
+  /**
+   * @param sender
+   *          can be null.
+   */
+  public static void cleanUpStatics(AbstractGatewaySender sender) {
+    if (buckToDispatchLock != null) {
+      buckToDispatchLock = null;
     }
-    if(regionToDispatchedKeysMapEmpty != null) {
-      this.regionToDispatchedKeysMapEmpty = null;
+    if (regionToDispatchedKeysMapEmpty != null) {
+      regionToDispatchedKeysMapEmpty = null;
     }
-    this.regionToDispatchedKeysMap.clear();
+    regionToDispatchedKeysMap.clear();
     synchronized (ParallelGatewaySenderQueue.class) {
-      if (this.removalThread != null) {
-        this.removalThread.shutdown();
-        this.removalThread = null;
+      if (removalThread != null) {
+        removalThread.shutdown();
+        removalThread = null;
       }
     }
     if (conflationExecutor != null) {
-      cleanupConflationThreadPool();
-      this.conflationExecutor = null;
+      cleanupConflationThreadPool(sender);
+      conflationExecutor = null;
     }
   }
-  
+
   @Override
   public void close() {
     // Because of bug 49060 do not close the regions of a parallel queue

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
index cf10b24..162e529 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
@@ -86,7 +86,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
       final String uniqueName, final int batchInterval,
       final boolean queuePersistent, final boolean writeonly,
       final long timeForRollover, final long maxFileSize) {
-    SerializableCallable createRegion = new SerializableCallable() {
+    SerializableCallable createRegion = new SerializableCallable("Create HDFS region") {
       public Object call() throws Exception {
         AttributesFactory af = new AttributesFactory();
         af.setDataPolicy(DataPolicy.HDFS_PARTITION);
@@ -95,8 +95,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
         paf.setRedundantCopies(1);
 
         af.setHDFSStoreName(uniqueName);
-
         af.setPartitionAttributes(paf.create());
+
         HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
         // Going two level up to avoid home directories getting created in
         // VM-specific directory. This avoids failures in those tests where
@@ -860,7 +860,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
   protected AsyncInvocation doAsyncPuts(VM vm, final String regionName,
       final int start, final int end, final String suffix, final String value)
       throws Exception {
-    return vm.invokeAsync(new SerializableCallable() {
+    return vm.invokeAsync(new SerializableCallable("doAsyncPuts") {
       public Object call() throws Exception {
         Region r = getRootRegion(regionName);
         String v = "V";
@@ -976,8 +976,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
   }
 
   /**
-   * create server with file rollover time as 2 secs. Insert few entries and
-   * then sleep for 2 sec. A file should be created. Do it again At th end, two
+   * Create server with file rollover time as 5 seconds. Insert few entries and
+   * then sleep for 7 seconds. A file should be created. Do it again. At the end, two
    * files with inserted entries should be created.
    * 
    * @throws Throwable
@@ -991,8 +991,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
     String homeDir = "./testWOTimeForRollOverParam";
     final String uniqueName = getName();
 
-    createServerRegion(vm0, 1, 1,  500, homeDir, uniqueName, 5, true, false, 4, 1);
-    createServerRegion(vm1, 1, 1,  500, homeDir, uniqueName, 5, true, false, 4, 1);
+    createServerRegion(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
+    createServerRegion(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
 
     AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 8, "vm0");
     AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 4, 10, "vm1");
@@ -1000,7 +1000,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
     a1.join();
     a2.join();
 
-    Thread.sleep(8000);
+    Thread.sleep(7000);
 
     a1 = doAsyncPuts(vm0, uniqueName, 10, 18, "vm0");
     a2 = doAsyncPuts(vm1, uniqueName, 14, 20, "vm1");
@@ -1008,13 +1008,13 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
     a1.join();
     a2.join();
 
-    Thread.sleep(8000);
+    Thread.sleep(7000);
 
     cacheClose(vm0, false);
     cacheClose(vm1, false);
 
-    AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1,  500, homeDir, uniqueName, 5, true, false, 4, 1);
-    AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1,  500, homeDir, uniqueName, 5, true, false, 4, 1);
+    AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
+    AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
     async1.getResult();
     async2.getResult();
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
index 21e2986..ee517d2 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
@@ -45,9 +45,13 @@ public class RegionWithHDFSOffHeapBasicDUnitTest extends
         }
       }
     };
-    checkOrphans.run();
-    invokeInEveryVM(checkOrphans);
-    super.tearDown2();
+    try {
+      checkOrphans.run();
+      invokeInEveryVM(checkOrphans);
+    } finally {
+      // proceed with tearDown2 anyway.
+      super.tearDown2();
+    }
   }
 
    public void testDelta() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
index 8ad57c9..92687ed 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
@@ -85,8 +85,9 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase {
   }
 
   public SerializableCallable cleanUpStoresAndDisconnect() throws Exception {
-    SerializableCallable cleanUp = new SerializableCallable() {
+    SerializableCallable cleanUp = new SerializableCallable("cleanUpStoresAndDisconnect") {
       public Object call() throws Exception {
+        disconnectFromDS();
         File file;
         if (homeDir != null) {
           file = new File(homeDir);
@@ -95,7 +96,6 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase {
         }
         file = new File(tmpDir);
         FileUtil.delete(file);
-        disconnectFromDS();
         return 0;
       }
     };
@@ -576,7 +576,7 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase {
     return entriesToFileMap;
   }
  protected SerializableCallable validateEmpty(VM vm0, final int numEntries, final String uniqueName) {
-    SerializableCallable validateEmpty = new SerializableCallable("validate") {
+    SerializableCallable validateEmpty = new SerializableCallable("validateEmpty") {
       public Object call() throws Exception {
         Region r = getRootRegion(uniqueName);