You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:30 UTC

[27/46] geode git commit: GEODE-2123: Replace static members in ParallelgatewaySenderQueues with instance variables

GEODE-2123: Replace static members in ParallelgatewaySenderQueues with instance variables


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1fabe490
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1fabe490
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1fabe490

Branch: refs/heads/feature/GEODE-1930
Commit: 1fabe49035e53d0986f4d54025c1bcb4ab7d9e04
Parents: d2141c4
Author: Jason Huynh <hu...@gmail.com>
Authored: Mon Nov 21 11:05:26 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Tue Nov 22 10:21:47 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/GemFireCacheImpl.java  |   1 -
 .../parallel/ParallelGatewaySenderQueue.java    | 102 +++++--------------
 2 files changed, 27 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/1fabe490/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 7b2e7ca..7f5fa32 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2205,7 +2205,6 @@ public class GemFireCacheImpl
           } catch (CancelException ce) {
           }
         }
-        ParallelGatewaySenderQueue.cleanUpStatics(null);
 
         destroyGatewaySenderLockService();
 

http://git-wip-us.apache.org/repos/asf/geode/blob/1fabe490/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 30773f9..553847d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -98,11 +98,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       new ConcurrentHashMap<String, PartitionedRegion>();
 
   // <PartitionedRegion, Map<Integer, List<Object>>>
-  private static final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
+  private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
 
-  protected static StoppableReentrantLock buckToDispatchLock;
+  protected final StoppableReentrantLock buckToDispatchLock;
 
-  private static StoppableCondition regionToDispatchedKeysMapEmpty;
+  private final StoppableCondition regionToDispatchedKeysMapEmpty;
 
   protected final StoppableReentrantLock queueEmptyLock;
 
@@ -137,7 +137,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   // TODO:REF: how to change the message sync interval ? should it be common for serial and parallel
   protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL;
   // TODO:REF: name change for thread, as it appears in the log
-  private static BatchRemovalThread removalThread = null;
+  private BatchRemovalThread removalThread = null;
 
   protected BlockingQueue<GatewaySenderEventImpl> peekedEvents =
       new LinkedBlockingQueue<GatewaySenderEventImpl>();
@@ -166,7 +166,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * set to the number of processors available to the JVM. There will be one thread pool per
    * ParallelGatewaySender on a node.
    */
-  private static volatile ExecutorService conflationExecutor;
+  private ExecutorService conflationExecutor;
 
   /**
    * This class carries out the actual removal of the previousTailKey from QPR. The class implements
@@ -277,12 +277,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       }
     }
 
-    if (buckToDispatchLock == null) {
-      buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion());
-    }
-    if (regionToDispatchedKeysMapEmpty == null) {
-      regionToDispatchedKeysMapEmpty = buckToDispatchLock.newCondition();
-    }
+    buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion());
+    regionToDispatchedKeysMapEmpty = buckToDispatchLock.newCondition();
 
     queueEmptyLock = new StoppableReentrantLock(sender.getCancelCriterion());
     queueEmptyCondition = queueEmptyLock.newCondition();
@@ -622,22 +618,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * Initialize the thread pool, setting the number of threads that is equal to the number of
    * processors available to the JVM.
    */
-  private static void initializeConflationThreadPool() {
-    if (conflationExecutor == null) {
-      final LoggingThreadGroup loggingThreadGroup =
-          LoggingThreadGroup.createThreadGroup("WAN Queue Conflation Logger Group", logger);
-
-      final ThreadFactory threadFactory = new ThreadFactory() {
-        public Thread newThread(final Runnable task) {
-          final Thread thread = new Thread(loggingThreadGroup, task, "WAN Queue Conflation Thread");
-          thread.setDaemon(true);
-          return thread;
-        }
-      };
+  private void initializeConflationThreadPool() {
+    final LoggingThreadGroup loggingThreadGroup =
+        LoggingThreadGroup.createThreadGroup("WAN Queue Conflation Logger Group", logger);
+
+    final ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable task) {
+        final Thread thread = new Thread(loggingThreadGroup, task, "WAN Queue Conflation Thread");
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
 
-      conflationExecutor =
-          Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
-    }
+    conflationExecutor =
+        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
   }
 
   /**
@@ -645,7 +639,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * submitted tasks. Wait a while for existing tasks to terminate. If the existing tasks still
    * don't complete, cancel them by calling shutdownNow.
    */
-  private static void cleanupConflationThreadPool(AbstractGatewaySender sender) {
+  private void cleanupConflationThreadPool(AbstractGatewaySender sender) {
+    if (conflationExecutor == null) {
+      return;
+    }
     conflationExecutor.shutdown();// Disable new tasks from being submitted
 
     try {
@@ -1582,26 +1579,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * Note that this cleanup doesn't clean the data held by the queue.
    */
   public void cleanUp() {
-    cleanUpStatics(this.sender);
-  }
-
-  /**
-   * @param sender can be null.
-   */
-  public static void cleanUpStatics(AbstractGatewaySender sender) {
-    buckToDispatchLock = null;
-    regionToDispatchedKeysMapEmpty = null;
     regionToDispatchedKeysMap.clear();
-    synchronized (ParallelGatewaySenderQueue.class) {
-      if (removalThread != null) {
-        removalThread.shutdown();
-        removalThread = null;
-      }
-    }
-    if (conflationExecutor != null) {
-      cleanupConflationThreadPool(sender);
-      conflationExecutor = null;
-    }
+    removalThread.shutdown();
+    cleanupConflationThreadPool(this.sender);
   }
 
   @Override
@@ -1639,7 +1619,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   // TODO:REF: Name for this class should be appropriate?
-  private static class BatchRemovalThread extends Thread {
+  private class BatchRemovalThread extends Thread {
     /**
      * boolean to make a shutdown request
      */
@@ -1731,34 +1711,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             }
             // Get all the data-stores wherever userPRs are present
             Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp);
-            cache.getDistributionManager().removeMembersWithSameOrNewerVersion(recipients,
-                Version.GFE_80);
-            if (!recipients.isEmpty()) {
-              for (Map.Entry<String, Map<Integer, List>> mapEntry : temp.entrySet()) {
-                Set<InternalDistributedMember> tempOldVersionMembers = recipients;
-                PartitionedRegion prQ = (PartitionedRegion) cache.getRegion(mapEntry.getKey());
-                Set<InternalDistributedMember> memberForPRQ =
-                    prQ.getRegionAdvisor().adviseDataStore();
-                memberForPRQ.retainAll(tempOldVersionMembers);
-                ParallelQueueBatchRemovalResponse response =
-                    ParallelQueueBatchRemovalMessage.send(memberForPRQ, prQ, mapEntry.getValue());
-                try {
-                  response.waitForResponse();
-                } catch (ForceReattemptException e) {
-                  // put temp back again in the map
-                  for (Integer bId : mapEntry.getValue().keySet()) {
-                    this.parallelQueue.addRemovedEvents(prQ, bId, mapEntry.getValue().get(bId));
-                  }
-                  if (logger.isDebugEnabled()) {
-                    logger.debug(
-                        "ParallelQueueBatchRemovalMessage got ForceReattemptException. Will continue.");
-                  }
-                }
-              }
-            }
-            recipients = getAllRecipients(cache, temp);
-            cache.getDistributionManager().retainMembersWithSameOrNewerVersion(recipients,
-                Version.GFE_80);
             if (!recipients.isEmpty()) {
               ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
               pqrm.setRecipients(recipients);