You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/08/12 19:15:13 UTC

[geode] 01/01: GEODE-7066: Modified batch conflation to use event id instead of shadow key

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-7066
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9e38830d399caeb169e90df6a42286e8455b9e56
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Fri Aug 9 10:20:37 2019 -0700

    GEODE-7066: Modified batch conflation to use event id instead of shadow key
---
 .../wan/AbstractGatewaySenderEventProcessor.java   |  15 ++-
 ...rallelGatewaySenderEventProcessorJUnitTest.java |  56 +++++++-
 ...SerialGatewaySenderEventProcessorJUnitTest.java | 146 +++++++++++++++++++++
 3 files changed, 209 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 3d5405e..241ce68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -800,7 +801,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
           // The event should not be conflated (create or destroy). Add it to
           // the map.
           ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
-              gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getShadowKey());
+              gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getEventId());
           conflatedEventsMap.put(key, gsEvent);
         }
       }
@@ -1389,17 +1390,17 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
 
     private String regionName;
 
-    private long shadowKey;
+    private EventID eventId;
 
     private ConflationKey(String region, Object key, Operation operation) {
-      this(region, key, operation, -1);
+      this(region, key, operation, null);
     }
 
-    private ConflationKey(String region, Object key, Operation operation, long shadowKey) {
+    private ConflationKey(String region, Object key, Operation operation, EventID eventId) {
       this.key = key;
       this.operation = operation;
       this.regionName = region;
-      this.shadowKey = shadowKey;
+      this.eventId = eventId;
     }
 
     @Override
@@ -1409,7 +1410,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
       result = prime * result + key.hashCode();
       result = prime * result + operation.hashCode();
       result = prime * result + regionName.hashCode();
-      result = prime * result + Long.hashCode(this.shadowKey);
+      result = prime * result + (eventId == null ? 0 : eventId.hashCode());
       return result;
     }
 
@@ -1434,7 +1435,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
       if (!this.operation.equals(that.operation)) {
         return false;
       }
-      if (this.shadowKey != that.shadowKey) {
+      if (!Objects.equals(this.eventId, that.eventId)) {
         return false;
       }
       return true;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 7b0cfb2..994ce95 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -62,6 +62,8 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
   @Test
   public void validateBatchConflationWithBatchContainingDuplicateConflatableEvents()
       throws Exception {
+    // This tests normal batch conflation.
+
     // Create a ParallelGatewaySenderEventProcessor
     AbstractGatewaySenderEventProcessor processor =
         ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
@@ -85,7 +87,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
         "Object_13964", lastUpdateValue, lastUpdateSequenceId, lastUpdateShadowKey));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
-        "Object_13964", null, 104, 28274));
+        "Object_13964", null, 105, 28274));
 
     // Conflate the batch of events
     List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
@@ -197,6 +199,58 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
   }
 
+  @Test
+  public void validateBatchConflationWithDuplicateNonConflatableEvents()
+      throws Exception {
+    // Duplicate non-conflatable events should not be conflated.
+    //
+    // Here is an example batch with duplicate create and destroy events on the same key from
+    // different threads:
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6072];operation=CREATE;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6073];operation=UPDATE;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6009];operation=CREATE;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6074];operation=DESTROY;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6011];operation=DESTROY;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6087];operation=CREATE;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6089];operation=DESTROY;region=/SESSIONS;key=1736],
+
+    // Create a ParallelGatewaySenderEventProcessor
+    AbstractGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    // Create mock region
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    when(lr.getCache()).thenReturn(this.cache);
+
+    // Create a batch of conflatable events with duplicate create and destroy events on the same key
+    // from different threads
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+        "6079", "6079", 6, 6072, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+        "6079", "6079", 6, 6073, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+        "1736", "1736", 5, 6009, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "6079", "6079", 6, 6074, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "1736", "1736", 5, 6011, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+        "1736", "1736", 6, 6087, 0, 0));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "1736", "1736", 6, 6089, 0, 0));
+    logEvents("original", originalEvents);
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+    logEvents("conflated", conflatedEvents);
+
+    // Assert no conflation occurs
+    assertThat(conflatedEvents.size()).isEqualTo(7);
+    assertThat(originalEvents).isEqualTo(conflatedEvents);
+  }
+
   private void logEvents(String message, List<GatewaySenderEventImpl> events) {
     StringBuilder builder = new StringBuilder();
     builder.append("The list contains the following ").append(events.size()).append(" ")
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
index 2376b41..7f7da97 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.logging.log4j.Logger;
@@ -33,14 +35,19 @@ import org.junit.Test;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.KeyInfo;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.fake.Fakes;
 
 public class SerialGatewaySenderEventProcessorJUnitTest {
 
@@ -48,6 +55,8 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
 
   private TestSerialGatewaySenderEventProcessor processor;
 
+  private GemFireCacheImpl cache;
+
   private static final Logger logger = LogService.getLogger();
 
   @Before
@@ -55,6 +64,9 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
     this.sender = mock(AbstractGatewaySender.class);
     this.processor =
         new TestSerialGatewaySenderEventProcessor(this.sender, "ny", null);
+    this.cache = Fakes.cache();
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(this.cache.getDistributedSystem()).thenReturn(ids);
   }
 
   @Test
@@ -224,6 +236,140 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
     assertThat(unProcessedTokens).contains("threadID=4;sequenceID=4");
   }
 
+  @Test
+  public void validateBatchConflationWithDuplicateConflatableEvents()
+      throws Exception {
+    // This tests normal batch conflation
+
+    // Create mock region
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    when(lr.getCache()).thenReturn(this.cache);
+
+    // Configure conflation
+    when(this.sender.isBatchConflationEnabled()).thenReturn(true);
+    when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+
+    // Create a batch of conflatable events with duplicate update events
+    Object lastUpdateValue = "Object_13964_5";
+    long lastUpdateSequenceId = 104;
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+        "Object_13964", "Object_13964_1", 1, 100));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_13964", "Object_13964_2", 1, 101));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_13964", "Object_13964_3", 1, 102));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_13964", "Object_13964_4", 1, 103));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_13964", lastUpdateValue, 1, lastUpdateSequenceId));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+        "Object_13964", null, 1, 105));
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+
+    // Verify:
+    // - the batch contains 3 events after conflation
+    // - they are CREATE, UPDATE, and DESTROY
+    // - the UPDATE event is the correct one
+    assertThat(conflatedEvents.size()).isEqualTo(3);
+    GatewaySenderEventImpl gsei1 = conflatedEvents.get(0);
+    assertThat(gsei1.getOperation()).isEqualTo(Operation.CREATE);
+    GatewaySenderEventImpl gsei2 = conflatedEvents.get(1);
+    assertThat(gsei2.getOperation()).isEqualTo(Operation.UPDATE);
+    GatewaySenderEventImpl gsei3 = conflatedEvents.get(2);
+    assertThat(gsei3.getOperation()).isEqualTo(Operation.DESTROY);
+    assertThat(gsei2.getDeserializedValue()).isEqualTo(lastUpdateValue);
+    assertThat(gsei2.getEventId().getSequenceID()).isEqualTo(lastUpdateSequenceId);
+  }
+
+  @Test
+  public void validateBatchConflationWithDuplicateNonConflatableEvents()
+      throws Exception {
+    // Duplicate non-conflatable events should not be conflated.
+    //
+    // Here is an example batch with duplicate create and destroy events on the same key from
+    // different threads:
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6072];operation=CREATE;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6073];operation=UPDATE;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6009];operation=CREATE;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6074];operation=DESTROY;region=/SESSIONS;key=6079],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6011];operation=DESTROY;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6087];operation=CREATE;region=/SESSIONS;key=1736],
+    // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6089];operation=DESTROY;region=/SESSIONS;key=1736],
+
+    // Create mock region
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    when(lr.getCache()).thenReturn(this.cache);
+
+    // Configure conflation
+    when(this.sender.isBatchConflationEnabled()).thenReturn(true);
+    when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+
+    // Create a batch of conflatable events with duplicate create and destroy events on the same key
+    // from different threads
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+        "6079", "6079", 6, 6072));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+        "6079", "6079", 6, 6073));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+        "1736", "1736", 5, 6009));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+        "6079", "6079", 6, 6074));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+        "1736", "1736", 5, 6011));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+        "1736", "1736", 6, 6087));
+    originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+        "1736", "1736", 6, 6089));
+    logEvents("original", originalEvents);
+
+    // Conflate the batch of event
+    List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+    logEvents("conflated", conflatedEvents);
+
+    // Assert no conflation occurs
+    assertThat(conflatedEvents.size()).isEqualTo(7);
+    assertThat(originalEvents).isEqualTo(conflatedEvents);
+  }
+
+  private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("The list contains the following ").append(events.size()).append(" ")
+        .append(message).append(" events:");
+    for (GatewaySenderEventImpl event : events) {
+      builder.append("\t\n").append(event.toSmallString());
+    }
+    System.out.println(builder);
+  }
+
+  private GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+      Object key, Object value, long threadId, long sequenceId)
+      throws Exception {
+    when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
+    EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
+    eei.setEventId(new EventID(new byte[16], threadId, sequenceId));
+    GatewaySenderEventImpl gsei =
+        new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true);
+    return gsei;
+  }
+
+  private EnumListenerEvent getEnumListenerEvent(Operation operation) {
+    EnumListenerEvent ele = null;
+    if (operation.isCreate()) {
+      ele = EnumListenerEvent.AFTER_CREATE;
+    } else if (operation.isUpdate()) {
+      ele = EnumListenerEvent.AFTER_UPDATE;
+    } else if (operation.isDestroy()) {
+      ele = EnumListenerEvent.AFTER_DESTROY;
+    }
+    return ele;
+  }
+
   private EventID handlePrimaryEvent() {
     GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class);
     EventID id = mock(EventID.class);