You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:30 UTC
[27/46] geode git commit: GEODE-2123: Replace static members in
ParallelgatewaySenderQueues with instance variables
GEODE-2123: Replace static members in ParallelgatewaySenderQueues with instance variables
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1fabe490
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1fabe490
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1fabe490
Branch: refs/heads/feature/GEODE-1930
Commit: 1fabe49035e53d0986f4d54025c1bcb4ab7d9e04
Parents: d2141c4
Author: Jason Huynh <hu...@gmail.com>
Authored: Mon Nov 21 11:05:26 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Tue Nov 22 10:21:47 2016 -0800
----------------------------------------------------------------------
.../geode/internal/cache/GemFireCacheImpl.java | 1 -
.../parallel/ParallelGatewaySenderQueue.java | 102 +++++--------------
2 files changed, 27 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/1fabe490/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 7b2e7ca..7f5fa32 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2205,7 +2205,6 @@ public class GemFireCacheImpl
} catch (CancelException ce) {
}
}
- ParallelGatewaySenderQueue.cleanUpStatics(null);
destroyGatewaySenderLockService();
http://git-wip-us.apache.org/repos/asf/geode/blob/1fabe490/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 30773f9..553847d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -98,11 +98,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
new ConcurrentHashMap<String, PartitionedRegion>();
// <PartitionedRegion, Map<Integer, List<Object>>>
- private static final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
+ private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
- protected static StoppableReentrantLock buckToDispatchLock;
+ protected final StoppableReentrantLock buckToDispatchLock;
- private static StoppableCondition regionToDispatchedKeysMapEmpty;
+ private final StoppableCondition regionToDispatchedKeysMapEmpty;
protected final StoppableReentrantLock queueEmptyLock;
@@ -137,7 +137,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// TODO:REF: how to change the message sync interval ? should it be common for serial and parallel
protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL;
// TODO:REF: name change for thread, as it appears in the log
- private static BatchRemovalThread removalThread = null;
+ private BatchRemovalThread removalThread = null;
protected BlockingQueue<GatewaySenderEventImpl> peekedEvents =
new LinkedBlockingQueue<GatewaySenderEventImpl>();
@@ -166,7 +166,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* set to the number of processors available to the JVM. There will be one thread pool per
* ParallelGatewaySender on a node.
*/
- private static volatile ExecutorService conflationExecutor;
+ private ExecutorService conflationExecutor;
/**
* This class carries out the actual removal of the previousTailKey from QPR. The class implements
@@ -277,12 +277,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
- if (buckToDispatchLock == null) {
- buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion());
- }
- if (regionToDispatchedKeysMapEmpty == null) {
- regionToDispatchedKeysMapEmpty = buckToDispatchLock.newCondition();
- }
+ buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion());
+ regionToDispatchedKeysMapEmpty = buckToDispatchLock.newCondition();
queueEmptyLock = new StoppableReentrantLock(sender.getCancelCriterion());
queueEmptyCondition = queueEmptyLock.newCondition();
@@ -622,22 +618,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* Initialize the thread pool, setting the number of threads that is equal to the number of
* processors available to the JVM.
*/
- private static void initializeConflationThreadPool() {
- if (conflationExecutor == null) {
- final LoggingThreadGroup loggingThreadGroup =
- LoggingThreadGroup.createThreadGroup("WAN Queue Conflation Logger Group", logger);
-
- final ThreadFactory threadFactory = new ThreadFactory() {
- public Thread newThread(final Runnable task) {
- final Thread thread = new Thread(loggingThreadGroup, task, "WAN Queue Conflation Thread");
- thread.setDaemon(true);
- return thread;
- }
- };
+ private void initializeConflationThreadPool() {
+ final LoggingThreadGroup loggingThreadGroup =
+ LoggingThreadGroup.createThreadGroup("WAN Queue Conflation Logger Group", logger);
+
+ final ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(final Runnable task) {
+ final Thread thread = new Thread(loggingThreadGroup, task, "WAN Queue Conflation Thread");
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
- conflationExecutor =
- Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
- }
+ conflationExecutor =
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
}
/**
@@ -645,7 +639,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* submitted tasks. Wait a while for existing tasks to terminate. If the existing tasks still
* don't complete, cancel them by calling shutdownNow.
*/
- private static void cleanupConflationThreadPool(AbstractGatewaySender sender) {
+ private void cleanupConflationThreadPool(AbstractGatewaySender sender) {
+ if (conflationExecutor == null) {
+ return;
+ }
conflationExecutor.shutdown();// Disable new tasks from being submitted
try {
@@ -1582,26 +1579,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* Note that this cleanup doesn't clean the data held by the queue.
*/
public void cleanUp() {
- cleanUpStatics(this.sender);
- }
-
- /**
- * @param sender can be null.
- */
- public static void cleanUpStatics(AbstractGatewaySender sender) {
- buckToDispatchLock = null;
- regionToDispatchedKeysMapEmpty = null;
regionToDispatchedKeysMap.clear();
- synchronized (ParallelGatewaySenderQueue.class) {
- if (removalThread != null) {
- removalThread.shutdown();
- removalThread = null;
- }
- }
- if (conflationExecutor != null) {
- cleanupConflationThreadPool(sender);
- conflationExecutor = null;
- }
+ removalThread.shutdown();
+ cleanupConflationThreadPool(this.sender);
}
@Override
@@ -1639,7 +1619,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
// TODO:REF: Name for this class should be appropriate?
- private static class BatchRemovalThread extends Thread {
+ private class BatchRemovalThread extends Thread {
/**
* boolean to make a shutdown request
*/
@@ -1731,34 +1711,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
// Get all the data-stores wherever userPRs are present
Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp);
- cache.getDistributionManager().removeMembersWithSameOrNewerVersion(recipients,
- Version.GFE_80);
- if (!recipients.isEmpty()) {
- for (Map.Entry<String, Map<Integer, List>> mapEntry : temp.entrySet()) {
- Set<InternalDistributedMember> tempOldVersionMembers = recipients;
- PartitionedRegion prQ = (PartitionedRegion) cache.getRegion(mapEntry.getKey());
- Set<InternalDistributedMember> memberForPRQ =
- prQ.getRegionAdvisor().adviseDataStore();
- memberForPRQ.retainAll(tempOldVersionMembers);
- ParallelQueueBatchRemovalResponse response =
- ParallelQueueBatchRemovalMessage.send(memberForPRQ, prQ, mapEntry.getValue());
- try {
- response.waitForResponse();
- } catch (ForceReattemptException e) {
- // put temp back again in the map
- for (Integer bId : mapEntry.getValue().keySet()) {
- this.parallelQueue.addRemovedEvents(prQ, bId, mapEntry.getValue().get(bId));
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "ParallelQueueBatchRemovalMessage got ForceReattemptException. Will continue.");
- }
- }
- }
- }
- recipients = getAllRecipients(cache, temp);
- cache.getDistributionManager().retainMembersWithSameOrNewerVersion(recipients,
- Version.GFE_80);
if (!recipients.isEmpty()) {
ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
pqrm.setRecipients(recipients);