You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/06/11 16:55:41 UTC

[geode] 01/01: GEODE-6854: Skipped events already contained in the batch during conflation

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git

commit babc6507f3592a7eb62cf7ac0ade0d2e75bee76f
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Jun 11 09:41:19 2019 -0700

    GEODE-6854: Skipped events already contained in the batch during conflation
---
 .../wan/AbstractGatewaySenderEventProcessor.java   | 30 ++++++++--
 .../internal/cache/wan/GatewaySenderEventImpl.java | 43 ++++++++++++++
 ...rallelGatewaySenderEventProcessorJUnitTest.java | 69 +++++++++++++++++++++-
 .../wan/parallel/ParallelGatewaySenderHelper.java  |  8 ++-
 4 files changed, 142 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 87d3021..058a1b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -772,6 +772,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
     List<GatewaySenderEventImpl> conflatedEvents = null;
     // Conflate the batch if necessary
     if (this.sender.isBatchConflationEnabled() && events.size() > 1) {
+      if (logger.isDebugEnabled()) {
+        logEvents("original", events);
+      }
       Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap =
           new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>();
       conflatedEvents = new ArrayList<GatewaySenderEventImpl>();
@@ -783,12 +786,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
           ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
               gsEvent.getKeyToConflate(), gsEvent.getOperation());
 
-          // Attempt to remove the key. If the entry is removed, that means a
-          // duplicate key was found. If not, this is a no-op.
-          conflatedEventsMap.remove(key);
+          // Get the entry at that key
+          GatewaySenderEventImpl existingEvent = conflatedEventsMap.get(key);
+          if (!gsEvent.equals(existingEvent)) {
+            // Attempt to remove the key. If the entry is removed, that means a
+            // duplicate key was found. If not, this is a no-op.
+            conflatedEventsMap.remove(key);
 
-          // Add the key to the end of the map.
-          conflatedEventsMap.put(key, gsEvent);
+            // Add the key to the end of the map.
+            conflatedEventsMap.put(key, gsEvent);
+          }
         } else {
           // The event should not be conflated (create or destroy). Add it to
           // the map.
@@ -806,12 +813,25 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
       // Increment the events conflated from batches statistic
       this.sender.getStatistics()
           .incEventsConflatedFromBatches(events.size() - conflatedEvents.size());
+      if (logger.isDebugEnabled()) {
+        logEvents("conflated", conflatedEvents);
+      }
     } else {
       conflatedEvents = events;
     }
     return conflatedEvents;
   }
 
+  private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("The batch contains the following ").append(events.size()).append(" ")
+        .append(message).append(" events:");
+    for (GatewaySenderEventImpl event : events) {
+      builder.append("\t\n").append(event.toSmallString());
+    }
+    logger.debug(builder);
+  }
+
   private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
     List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 8db48b4..1247d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -19,6 +19,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InternalGemFireError;
@@ -44,6 +45,7 @@ import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.WrappedCallbackArgument;
 import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.lang.ObjectUtils;
 import org.apache.geode.internal.offheap.OffHeapHelper;
 import org.apache.geode.internal.offheap.ReferenceCountHelper;
 import org.apache.geode.internal.offheap.Releasable;
@@ -774,6 +776,15 @@ public class GatewaySenderEventImpl
     return buffer.toString();
   }
 
+  public String toSmallString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("GatewaySenderEventImpl[").append("id=").append(this.id).append(";operation=")
+        .append(getOperation()).append(";region=").append(this.regionPath).append(";key=")
+        .append(this.key).append(";shadowKey=").append(this.shadowKey).append(";bucketId=")
+        .append(this.bucketId).append("]");
+    return buffer.toString();
+  }
+
   public static boolean isSerializingValue() {
     return ((Boolean) isSerializingValue.get()).booleanValue();
   }
@@ -1179,6 +1190,38 @@ public class GatewaySenderEventImpl
     return this.shadowKey;
   }
 
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj == null || !(obj instanceof GatewaySenderEventImpl)) {
+      return false;
+    }
+
+    GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj;
+
+    return this.shadowKey.equals(that.shadowKey)
+        && this.id.equals(that.id)
+        && this.bucketId == that.bucketId
+        && this.action == that.action
+        && this.regionPath.equals(that.regionPath)
+        && this.key.equals(that.key)
+        && Arrays.equals(this.value, that.value);
+  }
+
+  public int hashCode() {
+    int hashCode = 17;
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.shadowKey);
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.id);
+    hashCode = 37 * hashCode + this.bucketId;
+    hashCode = 37 * hashCode + this.action;
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.regionPath);
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.key);
+    hashCode = 37 * hashCode + (this.value == null ? 0 : Arrays.hashCode(this.value));
+    return hashCode;
+  }
+
   @Override
   public Version[] getSerializationVersions() {
     return new Version[] {Version.GEODE_1_9_0};
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 558df5c..1eb9920 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -125,7 +125,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     AbstractGatewaySenderEventProcessor processor =
         ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
 
-    // Create a batch of non-conflatable events with one duplicate
+    // Create a batch of non-conflatable events with one duplicate (not including the shadowKey)
     List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
     LocalRegion lr = mock(LocalRegion.class);
     when(lr.getFullPath()).thenReturn("/dataStoreRegion");
@@ -139,7 +139,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
         "Object_13964", "Object_13964", 100, 27709));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
-        "Object_14024", "Object_13964", 101, 27822));
+        "Object_14024", "Object_14024", 101, 27822));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
         "Object_13964", null, 102, 27935));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
@@ -151,4 +151,69 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     // Assert no events were conflated incorrectly
     assertThat(originalEvents).isEqualTo(conflatedEvents);
   }
+
+  @Test
+  public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents()
+      throws Exception {
+    // This is a test for GEODE-6854.
+    // A batch containing events like below is conflated incorrectly. The conflation code should
+    // remove the duplicates from this batch, but not change their order.
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587]
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926]
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+
+    // Create a ParallelGatewaySenderEventProcessor
+    AbstractGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    // Create mock region
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+
+    // Create a batch of conflatable and non-conflatable events with one duplicate conflatable event
+    // and one duplicate non-conflatable event (including the shadowKey)
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_6079", "Object_6079", 104, 2, 89, 16587));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "Object_6079", null, 104, 3, 89, 16700));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_7731", "Object_7731", 112, 9, 89, 16813));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_6591", "Object_6591", 112, 12, 89, 16926));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "Object_6079", null, 104, 3, 89, 16700));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_7731", "Object_7731", 112, 9, 89, 16813));
+    logEvents("original", originalEvents);
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+    logEvents("conflated", conflatedEvents);
+    assertThat(conflatedEvents.size()).isEqualTo(4);
+    assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0));
+    assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1));
+    assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2));
+    assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
+  }
+
+  private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("The list contains the following ").append(events.size()).append(" ")
+        .append(message).append(" events:");
+    for (GatewaySenderEventImpl event : events) {
+      builder.append("\t\n").append(event.toSmallString());
+    }
+    System.out.println(builder);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
index 53529af..e75b9cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -81,9 +81,15 @@ public class ParallelGatewaySenderHelper {
 
   public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
       Object key, Object value, long sequenceId, long shadowKey) throws Exception {
+    return createGatewaySenderEvent(lr, operation, key, value, 1l, sequenceId, 0, shadowKey);
+  }
+
+  public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+      Object key, Object value, long threadId, long sequenceId, int bucketId, long shadowKey)
+      throws Exception {
     when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
     EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
-    eei.setEventId(new EventID(new byte[16], 1l, sequenceId));
+    eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId));
     GatewaySenderEventImpl gsei =
         new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
     gsei.setShadowKey(shadowKey);