You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/10/24 16:51:05 UTC
[geode] branch develop updated: GEODE-3815: Changed to remove the
conflation index only if its for the same event
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 547f0cb GEODE-3815: Changed to remove the conflation index only if its for the same event
547f0cb is described below
commit 547f0cbaa916e4a155c969493924c244ec647e27
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Tue Oct 24 09:51:02 2017 -0700
GEODE-3815: Changed to remove the conflation index only if its for the same event
---
.../geode/internal/cache/BucketRegionQueue.java | 18 ++--
.../internal/cache/wan/GatewaySenderStats.java | 7 ++
.../geode/internal/cache/wan/WANTestBase.java | 91 +++--------------
.../wan/parallel/ParallelWANStatsDUnitTest.java | 112 ++++++++++++++++-----
4 files changed, 119 insertions(+), 109 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 567f46f..2900a2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -337,13 +337,17 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
Object key = object.getKeyToConflate();
Map latestIndexesForRegion = (Map) this.indexes.get(rName);
if (latestIndexesForRegion != null) {
- // Remove the index.
- Long index = (Long) latestIndexesForRegion.remove(key);
- if (index != null) {
- this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
- .decConflationIndexesMapSize();
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Removed index {} for {}", this, index, object);
+ // Remove the index if appropriate. Verify the qKey is actually the one being referenced
+ // in the index. If it isn't, then another event has been received for the real key. In
+ // that case, don't remove the index since it has already been overwritten.
+ if (latestIndexesForRegion.get(key) == qkey) {
+ Long index = (Long) latestIndexesForRegion.remove(key);
+ if (index != null) {
+ this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
+ .decConflationIndexesMapSize();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Removed index {} for {}", this, index, object);
+ }
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 228e7a9..61317c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -696,6 +696,13 @@ public class GatewaySenderStats {
}
/**
+ * Gets the value of the "conflationIndexesMapSize" stat
+ */
+ public int getConflationIndexesMapSize() {
+ return this.stats.getInt(conflationIndexesMapSizeId);
+ }
+
+ /**
* Returns the current time (ns).
*
* @return the current time (ns)
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 8f51d71..0b06a60 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1162,15 +1162,8 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender) s;
- break;
- }
- }
- final GatewaySenderStats statistics = sender.getStatistics();
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ GatewaySenderStats statistics = sender.getStatistics();
if (expectedQueueSize != -1) {
final RegionQueue regionQueue;
regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
@@ -1189,21 +1182,13 @@ public class WANTestBase extends JUnit4DistributedTestCase {
stats.add(statistics.getEventsFiltered());
stats.add(statistics.getEventsNotQueuedConflated());
stats.add(statistics.getEventsConflatedFromBatches());
+ stats.add(statistics.getConflationIndexesMapSize());
return stats;
}
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
-
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(queueSize, statistics.getEventQueueSize());
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsQueued, statistics.getEventsQueued());
@@ -1262,42 +1247,17 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(eventsFiltered, statistics.getEventsFiltered());
}
public static void checkConflatedStats(String senderId, final int eventsConflated) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
}
public static void checkStats_Failover(String senderId, final int eventsReceived) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
-
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsReceived,
(statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
@@ -1305,50 +1265,31 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
public static void checkBatchStats(String senderId, final int batches) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assert (statistics.getBatchesDistributed() >= batches);
assertEquals(0, statistics.getBatchesRedistributed());
}
public static void checkBatchStats(String senderId, final boolean batchesDistributed,
final boolean batchesRedistributed) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0));
assertEquals(batchesRedistributed, (statistics.getBatchesRedistributed() > 0));
}
public static void checkUnProcessedStats(String senderId, int events) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+ GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(events, (statistics.getUnprocessedEventsAddedBySecondary()
+ statistics.getUnprocessedTokensRemovedBySecondary()));
assertEquals(events, (statistics.getUnprocessedEventsRemovedByPrimary()
+ statistics.getUnprocessedTokensAddedByPrimary()));
}
+ public static GatewaySenderStats getGatewaySenderStats(String senderId) {
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ return ((AbstractGatewaySender) sender).getStatistics();
+ }
+
public static void waitForSenderRunningState(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
try {
@@ -2601,7 +2542,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
public static void checkQueueSize(String senderId, int numQueueEntries) {
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> testQueueSize(senderId, numQueueEntries));
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 9b69446..2a90308 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -15,26 +15,22 @@
package org.apache.geode.internal.cache.wan.parallel;
import org.awaitility.Awaitility;
-import org.junit.Ignore;
import org.junit.experimental.categories.Category;
import org.junit.Test;
import static org.junit.Assert.*;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.cache.Region;
import org.apache.geode.test.junit.categories.DistributedTest;
-import static org.apache.geode.test.dunit.Wait.*;
import static org.apache.geode.test.dunit.IgnoredException.*;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.junit.experimental.categories.Category;
-
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
@@ -424,6 +420,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createReceiverPR(vm2, 1);
Map keyValues = putKeyValues();
+
+ // Verify the conflation indexes map is empty
+ verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7);
+
final Map updateKeyValues = new HashMap();
for (int i = 0; i < 50; i++) {
updateKeyValues.put(i, i + "_updated");
@@ -431,12 +431,18 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
+ // Verify the conflation indexes map equals the number of updates
+ verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
+
vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
keyValues.size() + updateKeyValues.size() /* creates aren't conflated */ ));
// Do the puts again. Since these are updates, the previous updates will be conflated.
vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
+ // Verify the conflation indexes map still equals the number of updates
+ verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
+
vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
keyValues.size() + updateKeyValues.size() /* creates aren't conflated */ ));
@@ -458,28 +464,63 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size
- assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
- assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
- // queued
- assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
- // distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); // batches
- // distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches
- // redistributed
- assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); // events
- // conflated
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // Verify final stats
+ // 0 -> eventQueueSize
+ // 1 -> eventsReceived
+ // 2 -> eventsQueued
+ // 3 -> eventsDistributed
+ // 4 -> batchesDistributed
+ // 5 -> batchesRedistributed
+ // 7 -> eventsNotQueuedConflated
+ // 9 -> conflationIndexesMapSize
+ assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
+ assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
+ assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
+ assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10);
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
+ assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
+ assertEquals(0, v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9));
+ }
+ @Test
+ public void testConflationWithSameEntryPuts() throws Exception {
+ // Start locators
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ // Configure sending site member
+ String senderId = "ny";
+ String regionName = this.testName + "_PR";
+ vm1.invoke(() -> createCache(lnPort));
+ vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, true, true, null, false));
+ vm1.invoke(() -> createPartitionedRegion(regionName, senderId, 0, 10, isOffHeap()));
+
+ // Do puts of the same key
+ int numIterations = 100;
+ vm1.invoke(() -> putSameEntry(regionName, numIterations));
+
+ // Verify queue size (no need to wait)
+ vm1.invoke(() -> testQueueSize(senderId, 2));
+
+ // Verify the conflation indexes size stat
+ verifyConflationIndexesSize(senderId, 1, vm1);
+
+ // Configure receiving site member
+ vm3.invoke(() -> createCache(nyPort));
+ vm3.invoke(() -> createReceiver());
+ vm3.invoke(() -> createPartitionedRegion(regionName, null, 0, 10, isOffHeap()));
+
+ // Wait for queue to drain
+ vm1.invoke(() -> checkQueueSize(senderId, 0));
+
+ // Verify the conflation indexes size stat
+ verifyConflationIndexesSize(senderId, 0, vm1);
}
protected Map putKeyValues() {
@@ -540,4 +581,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
}
+
+ private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) {
+ int actualSize = 0;
+ for (VM vm : vms) {
+ List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
+ actualSize += stats.get(9);
+ }
+ assertEquals(expectedSize, actualSize);
+ }
+
+ private void putSameEntry(String regionName, int numIterations) {
+ // This does one create and numInterations-1 updates
+ Region region = cache.getRegion(regionName);
+ for (int i = 0; i < numIterations; i++) {
+ region.put(0, i);
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].