You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/10/24 16:51:05 UTC

[geode] branch develop updated: GEODE-3815: Changed to remove the conflation index only if its for the same event

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 547f0cb  GEODE-3815: Changed to remove the conflation index only if its for the same event
547f0cb is described below

commit 547f0cbaa916e4a155c969493924c244ec647e27
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Tue Oct 24 09:51:02 2017 -0700

    GEODE-3815: Changed to remove the conflation index only if its for the same event
---
 .../geode/internal/cache/BucketRegionQueue.java    |  18 ++--
 .../internal/cache/wan/GatewaySenderStats.java     |   7 ++
 .../geode/internal/cache/wan/WANTestBase.java      |  91 +++--------------
 .../wan/parallel/ParallelWANStatsDUnitTest.java    | 112 ++++++++++++++++-----
 4 files changed, 119 insertions(+), 109 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 567f46f..2900a2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -337,13 +337,17 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         Object key = object.getKeyToConflate();
         Map latestIndexesForRegion = (Map) this.indexes.get(rName);
         if (latestIndexesForRegion != null) {
-          // Remove the index.
-          Long index = (Long) latestIndexesForRegion.remove(key);
-          if (index != null) {
-            this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
-                .decConflationIndexesMapSize();
-            if (logger.isDebugEnabled()) {
-              logger.debug("{}: Removed index {} for {}", this, index, object);
+          // Remove the index if appropriate. Verify the qKey is actually the one being referenced
+          // in the index. If it isn't, then another event has been received for the real key. In
+          // that case, don't remove the index since it has already been overwritten.
+          if (latestIndexesForRegion.get(key) == qkey) {
+            Long index = (Long) latestIndexesForRegion.remove(key);
+            if (index != null) {
+              this.getPartitionedRegion().getParallelGatewaySender().getStatistics()
+                  .decConflationIndexesMapSize();
+              if (logger.isDebugEnabled()) {
+                logger.debug("{}: Removed index {} for {}", this, index, object);
+              }
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 228e7a9..61317c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -696,6 +696,13 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Gets the value of the "conflationIndexesMapSize" stat
+   */
+  public int getConflationIndexesMapSize() {
+    return this.stats.getInt(conflationIndexesMapSizeId);
+  }
+
+  /**
    * Returns the current time (ns).
    * 
    * @return the current time (ns)
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 8f51d71..0b06a60 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1162,15 +1162,8 @@ public class WANTestBase extends JUnit4DistributedTestCase {
   }
 
   public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    AbstractGatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = (AbstractGatewaySender) s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = sender.getStatistics();
+    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
     if (expectedQueueSize != -1) {
       final RegionQueue regionQueue;
       regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
@@ -1189,21 +1182,13 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     stats.add(statistics.getEventsFiltered());
     stats.add(statistics.getEventsNotQueuedConflated());
     stats.add(statistics.getEventsConflatedFromBatches());
+    stats.add(statistics.getConflationIndexesMapSize());
     return stats;
   }
 
   public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsQueued, statistics.getEventsQueued());
@@ -1262,42 +1247,17 @@ public class WANTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(eventsFiltered, statistics.getEventsFiltered());
   }
 
   public static void checkConflatedStats(String senderId, final int eventsConflated) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
   }
 
   public static void checkStats_Failover(String senderId, final int eventsReceived) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
-
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsReceived,
         (statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
@@ -1305,50 +1265,31 @@ public class WANTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void checkBatchStats(String senderId, final int batches) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assert (statistics.getBatchesDistributed() >= batches);
     assertEquals(0, statistics.getBatchesRedistributed());
   }
 
   public static void checkBatchStats(String senderId, final boolean batchesDistributed,
       final boolean batchesRedistributed) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0));
     assertEquals(batchesRedistributed, (statistics.getBatchesRedistributed() > 0));
   }
 
   public static void checkUnProcessedStats(String senderId, int events) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender) sender).getStatistics();
+    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
     assertEquals(events, (statistics.getUnprocessedEventsAddedBySecondary()
         + statistics.getUnprocessedTokensRemovedBySecondary()));
     assertEquals(events, (statistics.getUnprocessedEventsRemovedByPrimary()
         + statistics.getUnprocessedTokensAddedByPrimary()));
   }
 
+  public static GatewaySenderStats getGatewaySenderStats(String senderId) {
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    return ((AbstractGatewaySender) sender).getStatistics();
+  }
+
   public static void waitForSenderRunningState(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     try {
@@ -2601,7 +2542,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void checkQueueSize(String senderId, int numQueueEntries) {
-    Awaitility.await().atMost(10, TimeUnit.SECONDS)
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
         .until(() -> testQueueSize(senderId, numQueueEntries));
   }
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 9b69446..2a90308 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -15,26 +15,22 @@
 package org.apache.geode.internal.cache.wan.parallel;
 
 import org.awaitility.Awaitility;
-import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.cache.Region;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
-import static org.apache.geode.test.dunit.Wait.*;
 import static org.apache.geode.test.dunit.IgnoredException.*;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.experimental.categories.Category;
-
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
@@ -424,6 +420,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     createReceiverPR(vm2, 1);
 
     Map keyValues = putKeyValues();
+
+    // Verify the conflation indexes map is empty
+    verifyConflationIndexesSize("ln", 0, vm4, vm5, vm6, vm7);
+
     final Map updateKeyValues = new HashMap();
     for (int i = 0; i < 50; i++) {
       updateKeyValues.put(i, i + "_updated");
@@ -431,12 +431,18 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
 
+    // Verify the conflation indexes map equals the number of updates
+    verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
+
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
         keyValues.size() + updateKeyValues.size() /* creates aren't conflated */ ));
 
     // Do the puts again. Since these are updates, the previous updates will be conflated.
     vm4.invoke(() -> WANTestBase.putGivenKeyValue(testName, updateKeyValues));
 
+    // Verify the conflation indexes map still equals the number of updates
+    verifyConflationIndexesSize("ln", 50, vm4, vm5, vm6, vm7);
+
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln",
         keyValues.size() + updateKeyValues.size() /* creates aren't conflated */ ));
 
@@ -458,28 +464,63 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size
-    assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
-    assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
-                                                                                      // queued
-    assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
-                                                                                      // distributed
-    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); // batches
-                                                                                     // distributed
-    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches
-                                                                                    // redistributed
-    assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); // events
-                                                                                     // conflated
+    List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    // Verify final stats
+    // 0 -> eventQueueSize
+    // 1 -> eventsReceived
+    // 2 -> eventsQueued
+    // 3 -> eventsDistributed
+    // 4 -> batchesDistributed
+    // 5 -> batchesRedistributed
+    // 7 -> eventsNotQueuedConflated
+    // 9 -> conflationIndexesMapSize
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0));
+    assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1));
+    assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2));
+    assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3));
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10);
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
+    assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
+    assertEquals(0, v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9));
+  }
 
+  @Test
+  public void testConflationWithSameEntryPuts() throws Exception {
+    // Start locators
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    // Configure sending site member
+    String senderId = "ny";
+    String regionName = this.testName + "_PR";
+    vm1.invoke(() -> createCache(lnPort));
+    vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, true, true, null, false));
+    vm1.invoke(() -> createPartitionedRegion(regionName, senderId, 0, 10, isOffHeap()));
+
+    // Do puts of the same key
+    int numIterations = 100;
+    vm1.invoke(() -> putSameEntry(regionName, numIterations));
+
+    // Verify queue size (no need to wait)
+    vm1.invoke(() -> testQueueSize(senderId, 2));
+
+    // Verify the conflation indexes size stat
+    verifyConflationIndexesSize(senderId, 1, vm1);
+
+    // Configure receiving site member
+    vm3.invoke(() -> createCache(nyPort));
+    vm3.invoke(() -> createReceiver());
+    vm3.invoke(() -> createPartitionedRegion(regionName, null, 0, 10, isOffHeap()));
+
+    // Wait for queue to drain
+    vm1.invoke(() -> checkQueueSize(senderId, 0));
+
+    // Verify the conflation indexes size stat
+    verifyConflationIndexesSize(senderId, 0, vm1);
   }
 
   protected Map putKeyValues() {
@@ -540,4 +581,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
   }
+
+  private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) {
+    int actualSize = 0;
+    for (VM vm : vms) {
+      List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
+      actualSize += stats.get(9);
+    }
+    assertEquals(expectedSize, actualSize);
+  }
+
+  private void putSameEntry(String regionName, int numIterations) {
+    // This does one create and numInterations-1 updates
+    Region region = cache.getRegion(regionName);
+    for (int i = 0; i < numIterations; i++) {
+      region.put(0, i);
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].