You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2018/05/14 23:55:58 UTC
[geode] 03/07: GEODE-5056: when found the dropped events at primary
sender, send (#1794)
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch support/9.5
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6b54ae977abcf575513d21c608c6ce0e36014e32
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Apr 13 10:09:22 2018 -0700
GEODE-5056: when found the dropped events at primary sender, send (#1794)
QueueRemovalMessage for it
(cherry picked from commit f7bb77c89a3d19673e8929275fc6c407a4b382bd)
---
.../cache/wan/AbstractGatewaySenderEventProcessor.java | 2 +-
.../cache/wan/parallel/ParallelGatewaySenderQueue.java | 17 ++++++++++++++++-
.../serial/SerialGatewaySenderOperationsDUnitTest.java | 2 ++
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index eea7480..34d511c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -304,7 +304,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
(ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
if (isPrimary) {
- pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+ pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
if (logger.isDebugEnabled()) {
logger.debug("register dropped event for primary queue. BucketId is " + bucketId
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 89880fc..cdb33ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// This method may need synchronization in case it is used by
// ConcurrentParallelGatewaySender
- public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+ protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
StoppableReentrantLock lock = buckToDispatchLock;
if (lock != null) {
lock.lock();
@@ -1133,6 +1133,21 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
+ public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, int bucketId,
+ Object key) {
+ final HashMap<String, Map<Integer, List>> temp = new HashMap<String, Map<Integer, List>>();
+ Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
+ temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+ addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
+ Set<InternalDistributedMember> recipients =
+ removalThread.getAllRecipients(sender.getCache(), temp);
+ if (!recipients.isEmpty()) {
+ ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
+ pqrm.setRecipients(recipients);
+ sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm);
+ }
+ }
+
private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int bucketId, Object key) {
List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
if (dispatchedKeys == null) {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 8df5650..434fa62 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -50,6 +50,7 @@ import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.WanTest;
/**
@@ -268,6 +269,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
}
+ @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
@Test
public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
--
To stop receiving notification emails like this one, please contact
gosullivan@apache.org.