You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/06/11 16:55:41 UTC
[geode] 01/01: GEODE-6854: Skipped events already contained in the
batch during conflation
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git
commit babc6507f3592a7eb62cf7ac0ade0d2e75bee76f
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Jun 11 09:41:19 2019 -0700
GEODE-6854: Skipped events already contained in the batch during conflation
---
.../wan/AbstractGatewaySenderEventProcessor.java | 30 ++++++++--
.../internal/cache/wan/GatewaySenderEventImpl.java | 43 ++++++++++++++
...rallelGatewaySenderEventProcessorJUnitTest.java | 69 +++++++++++++++++++++-
.../wan/parallel/ParallelGatewaySenderHelper.java | 8 ++-
4 files changed, 142 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 87d3021..058a1b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -772,6 +772,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
List<GatewaySenderEventImpl> conflatedEvents = null;
// Conflate the batch if necessary
if (this.sender.isBatchConflationEnabled() && events.size() > 1) {
+ if (logger.isDebugEnabled()) {
+ logEvents("original", events);
+ }
Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap =
new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>();
conflatedEvents = new ArrayList<GatewaySenderEventImpl>();
@@ -783,12 +786,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
gsEvent.getKeyToConflate(), gsEvent.getOperation());
- // Attempt to remove the key. If the entry is removed, that means a
- // duplicate key was found. If not, this is a no-op.
- conflatedEventsMap.remove(key);
+ // Get the entry at that key
+ GatewaySenderEventImpl existingEvent = conflatedEventsMap.get(key);
+ if (!gsEvent.equals(existingEvent)) {
+ // Attempt to remove the key. If the entry is removed, that means a
+ // duplicate key was found. If not, this is a no-op.
+ conflatedEventsMap.remove(key);
- // Add the key to the end of the map.
- conflatedEventsMap.put(key, gsEvent);
+ // Add the key to the end of the map.
+ conflatedEventsMap.put(key, gsEvent);
+ }
} else {
// The event should not be conflated (create or destroy). Add it to
// the map.
@@ -806,12 +813,25 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
// Increment the events conflated from batches statistic
this.sender.getStatistics()
.incEventsConflatedFromBatches(events.size() - conflatedEvents.size());
+ if (logger.isDebugEnabled()) {
+ logEvents("conflated", conflatedEvents);
+ }
} else {
conflatedEvents = events;
}
return conflatedEvents;
}
+ private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("The batch contains the following ").append(events.size()).append(" ")
+ .append(message).append(" events:");
+ for (GatewaySenderEventImpl event : events) {
+ builder.append("\t\n").append(event.toSmallString());
+ }
+ logger.debug(builder);
+ }
+
private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 8db48b4..1247d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -19,6 +19,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
@@ -44,6 +45,7 @@ import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.WrappedCallbackArgument;
import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.lang.ObjectUtils;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.ReferenceCountHelper;
import org.apache.geode.internal.offheap.Releasable;
@@ -774,6 +776,15 @@ public class GatewaySenderEventImpl
return buffer.toString();
}
+ public String toSmallString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("GatewaySenderEventImpl[").append("id=").append(this.id).append(";operation=")
+ .append(getOperation()).append(";region=").append(this.regionPath).append(";key=")
+ .append(this.key).append(";shadowKey=").append(this.shadowKey).append(";bucketId=")
+ .append(this.bucketId).append("]");
+ return buffer.toString();
+ }
+
public static boolean isSerializingValue() {
return ((Boolean) isSerializingValue.get()).booleanValue();
}
@@ -1179,6 +1190,38 @@ public class GatewaySenderEventImpl
return this.shadowKey;
}
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || !(obj instanceof GatewaySenderEventImpl)) {
+ return false;
+ }
+
+ GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj;
+
+ return this.shadowKey.equals(that.shadowKey)
+ && this.id.equals(that.id)
+ && this.bucketId == that.bucketId
+ && this.action == that.action
+ && this.regionPath.equals(that.regionPath)
+ && this.key.equals(that.key)
+ && Arrays.equals(this.value, that.value);
+ }
+
+ public int hashCode() {
+ int hashCode = 17;
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.shadowKey);
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.id);
+ hashCode = 37 * hashCode + this.bucketId;
+ hashCode = 37 * hashCode + this.action;
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.regionPath);
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.key);
+ hashCode = 37 * hashCode + (this.value == null ? 0 : Arrays.hashCode(this.value));
+ return hashCode;
+ }
+
@Override
public Version[] getSerializationVersions() {
return new Version[] {Version.GEODE_1_9_0};
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 558df5c..1eb9920 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -125,7 +125,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
- // Create a batch of non-conflatable events with one duplicate
+ // Create a batch of non-conflatable events with one duplicate (not including the shadowKey)
List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
@@ -139,7 +139,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_13964", "Object_13964", 100, 27709));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
- "Object_14024", "Object_13964", 101, 27822));
+ "Object_14024", "Object_14024", 101, 27822));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_13964", null, 102, 27935));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
@@ -151,4 +151,69 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
// Assert no events were conflated incorrectly
assertThat(originalEvents).isEqualTo(conflatedEvents);
}
+
+ @Test
+ public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents()
+ throws Exception {
+ // This is a test for GEODE-6854.
+ // A batch containing events like below is conflated incorrectly. The conflation code should
+ // remove the duplicates from this batch, but not change their order.
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587]
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926]
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+
+ // Create a ParallelGatewaySenderEventProcessor
+ AbstractGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+ // Create mock region
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ when(lr.getCache()).thenReturn(cache);
+ when(cache.getDistributedSystem()).thenReturn(ids);
+
+ // Create a batch of conflatable and non-conflatable events with one duplicate conflatable event
+ // and one duplicate non-conflatable event (including the shadowKey)
+ List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_6079", "Object_6079", 104, 2, 89, 16587));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "Object_6079", null, 104, 3, 89, 16700));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_7731", "Object_7731", 112, 9, 89, 16813));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_6591", "Object_6591", 112, 12, 89, 16926));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "Object_6079", null, 104, 3, 89, 16700));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_7731", "Object_7731", 112, 9, 89, 16813));
+ logEvents("original", originalEvents);
+
+ // Conflate the batch of events
+ List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+ logEvents("conflated", conflatedEvents);
+ assertThat(conflatedEvents.size()).isEqualTo(4);
+ assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0));
+ assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1));
+ assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2));
+ assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
+ }
+
+ private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("The list contains the following ").append(events.size()).append(" ")
+ .append(message).append(" events:");
+ for (GatewaySenderEventImpl event : events) {
+ builder.append("\t\n").append(event.toSmallString());
+ }
+ System.out.println(builder);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
index 53529af..e75b9cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -81,9 +81,15 @@ public class ParallelGatewaySenderHelper {
public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
Object key, Object value, long sequenceId, long shadowKey) throws Exception {
+ return createGatewaySenderEvent(lr, operation, key, value, 1l, sequenceId, 0, shadowKey);
+ }
+
+ public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+ Object key, Object value, long threadId, long sequenceId, int bucketId, long shadowKey)
+ throws Exception {
when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
- eei.setEventId(new EventID(new byte[16], 1l, sequenceId));
+ eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId));
GatewaySenderEventImpl gsei =
new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
gsei.setShadowKey(shadowKey);