You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/06/11 16:55:40 UTC

[geode] branch feature/GEODE-6854 created (now babc650)

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a change to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at babc650  GEODE-6854: Skipped events already contained in the batch during conflation

This branch includes the following new commits:

     new babc650  GEODE-6854: Skipped events already contained in the batch during conflation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-6854: Skipped events already contained in the batch during conflation

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git

commit babc6507f3592a7eb62cf7ac0ade0d2e75bee76f
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Jun 11 09:41:19 2019 -0700

    GEODE-6854: Skipped events already contained in the batch during conflation
---
 .../wan/AbstractGatewaySenderEventProcessor.java   | 30 ++++++++--
 .../internal/cache/wan/GatewaySenderEventImpl.java | 43 ++++++++++++++
 ...rallelGatewaySenderEventProcessorJUnitTest.java | 69 +++++++++++++++++++++-
 .../wan/parallel/ParallelGatewaySenderHelper.java  |  8 ++-
 4 files changed, 142 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 87d3021..058a1b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -772,6 +772,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
     List<GatewaySenderEventImpl> conflatedEvents = null;
     // Conflate the batch if necessary
     if (this.sender.isBatchConflationEnabled() && events.size() > 1) {
+      if (logger.isDebugEnabled()) {
+        logEvents("original", events);
+      }
       Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap =
           new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>();
       conflatedEvents = new ArrayList<GatewaySenderEventImpl>();
@@ -783,12 +786,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
           ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
               gsEvent.getKeyToConflate(), gsEvent.getOperation());
 
-          // Attempt to remove the key. If the entry is removed, that means a
-          // duplicate key was found. If not, this is a no-op.
-          conflatedEventsMap.remove(key);
+          // Get the entry at that key
+          GatewaySenderEventImpl existingEvent = conflatedEventsMap.get(key);
+          if (!gsEvent.equals(existingEvent)) {
+            // Attempt to remove the key. If the entry is removed, that means a
+            // duplicate key was found. If not, this is a no-op.
+            conflatedEventsMap.remove(key);
 
-          // Add the key to the end of the map.
-          conflatedEventsMap.put(key, gsEvent);
+            // Add the key to the end of the map.
+            conflatedEventsMap.put(key, gsEvent);
+          }
         } else {
           // The event should not be conflated (create or destroy). Add it to
           // the map.
@@ -806,12 +813,25 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
       // Increment the events conflated from batches statistic
       this.sender.getStatistics()
           .incEventsConflatedFromBatches(events.size() - conflatedEvents.size());
+      if (logger.isDebugEnabled()) {
+        logEvents("conflated", conflatedEvents);
+      }
     } else {
       conflatedEvents = events;
     }
     return conflatedEvents;
   }
 
+  private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("The batch contains the following ").append(events.size()).append(" ")
+        .append(message).append(" events:");
+    for (GatewaySenderEventImpl event : events) {
+      builder.append("\t\n").append(event.toSmallString());
+    }
+    logger.debug(builder);
+  }
+
   private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
     List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 8db48b4..1247d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -19,6 +19,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InternalGemFireError;
@@ -44,6 +45,7 @@ import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.WrappedCallbackArgument;
 import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.lang.ObjectUtils;
 import org.apache.geode.internal.offheap.OffHeapHelper;
 import org.apache.geode.internal.offheap.ReferenceCountHelper;
 import org.apache.geode.internal.offheap.Releasable;
@@ -774,6 +776,15 @@ public class GatewaySenderEventImpl
     return buffer.toString();
   }
 
+  public String toSmallString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("GatewaySenderEventImpl[").append("id=").append(this.id).append(";operation=")
+        .append(getOperation()).append(";region=").append(this.regionPath).append(";key=")
+        .append(this.key).append(";shadowKey=").append(this.shadowKey).append(";bucketId=")
+        .append(this.bucketId).append("]");
+    return buffer.toString();
+  }
+
   public static boolean isSerializingValue() {
     return ((Boolean) isSerializingValue.get()).booleanValue();
   }
@@ -1179,6 +1190,38 @@ public class GatewaySenderEventImpl
     return this.shadowKey;
   }
 
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj == null || !(obj instanceof GatewaySenderEventImpl)) {
+      return false;
+    }
+
+    GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj;
+
+    return this.shadowKey.equals(that.shadowKey)
+        && this.id.equals(that.id)
+        && this.bucketId == that.bucketId
+        && this.action == that.action
+        && this.regionPath.equals(that.regionPath)
+        && this.key.equals(that.key)
+        && Arrays.equals(this.value, that.value);
+  }
+
+  public int hashCode() {
+    int hashCode = 17;
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.shadowKey);
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.id);
+    hashCode = 37 * hashCode + this.bucketId;
+    hashCode = 37 * hashCode + this.action;
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.regionPath);
+    hashCode = 37 * hashCode + ObjectUtils.hashCode(this.key);
+    hashCode = 37 * hashCode + (this.value == null ? 0 : Arrays.hashCode(this.value));
+    return hashCode;
+  }
+
   @Override
   public Version[] getSerializationVersions() {
     return new Version[] {Version.GEODE_1_9_0};
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 558df5c..1eb9920 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -125,7 +125,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     AbstractGatewaySenderEventProcessor processor =
         ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
 
-    // Create a batch of non-conflatable events with one duplicate
+    // Create a batch of non-conflatable events with one duplicate (not including the shadowKey)
     List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
     LocalRegion lr = mock(LocalRegion.class);
     when(lr.getFullPath()).thenReturn("/dataStoreRegion");
@@ -139,7 +139,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
         "Object_13964", "Object_13964", 100, 27709));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
-        "Object_14024", "Object_13964", 101, 27822));
+        "Object_14024", "Object_14024", 101, 27822));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
         "Object_13964", null, 102, 27935));
     originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
@@ -151,4 +151,69 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
     // Assert no events were conflated incorrectly
     assertThat(originalEvents).isEqualTo(conflatedEvents);
   }
+
+  @Test
+  public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents()
+      throws Exception {
+    // This is a test for GEODE-6854.
+    // A batch containing events like below is conflated incorrectly. The conflation code should
+    // remove the duplicates from this batch, but not change their order.
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587]
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926]
+    // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+    // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+
+    // Create a ParallelGatewaySenderEventProcessor
+    AbstractGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    // Create mock region
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+
+    // Create a batch of conflatable and non-conflatable events with one duplicate conflatable event
+    // and one duplicate non-conflatable event (including the shadowKey)
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+        "Object_6079", "Object_6079", 104, 2, 89, 16587));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "Object_6079", null, 104, 3, 89, 16700));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_7731", "Object_7731", 112, 9, 89, 16813));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_6591", "Object_6591", 112, 12, 89, 16926));
+    originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+        "Object_6079", null, 104, 3, 89, 16700));
+    originalEvents
+        .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+            "Object_7731", "Object_7731", 112, 9, 89, 16813));
+    logEvents("original", originalEvents);
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+    logEvents("conflated", conflatedEvents);
+    assertThat(conflatedEvents.size()).isEqualTo(4);
+    assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0));
+    assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1));
+    assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2));
+    assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
+  }
+
+  private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("The list contains the following ").append(events.size()).append(" ")
+        .append(message).append(" events:");
+    for (GatewaySenderEventImpl event : events) {
+      builder.append("\t\n").append(event.toSmallString());
+    }
+    System.out.println(builder);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
index 53529af..e75b9cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -81,9 +81,15 @@ public class ParallelGatewaySenderHelper {
 
   public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
       Object key, Object value, long sequenceId, long shadowKey) throws Exception {
+    return createGatewaySenderEvent(lr, operation, key, value, 1l, sequenceId, 0, shadowKey);
+  }
+
+  public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+      Object key, Object value, long threadId, long sequenceId, int bucketId, long shadowKey)
+      throws Exception {
     when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
     EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
-    eei.setEventId(new EventID(new byte[16], 1l, sequenceId));
+    eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId));
     GatewaySenderEventImpl gsei =
         new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
     gsei.setShadowKey(shadowKey);