You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/06/11 16:55:40 UTC
[geode] branch feature/GEODE-6854 created (now babc650)
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a change to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git.
at babc650 GEODE-6854: Skipped events already contained in the batch during conflation
This branch includes the following new commits:
new babc650 GEODE-6854: Skipped events already contained in the batch during conflation
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[geode] 01/01: GEODE-6854: Skipped events already contained in the
batch during conflation
Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-6854
in repository https://gitbox.apache.org/repos/asf/geode.git
commit babc6507f3592a7eb62cf7ac0ade0d2e75bee76f
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Jun 11 09:41:19 2019 -0700
GEODE-6854: Skipped events already contained in the batch during conflation
---
.../wan/AbstractGatewaySenderEventProcessor.java | 30 ++++++++--
.../internal/cache/wan/GatewaySenderEventImpl.java | 43 ++++++++++++++
...rallelGatewaySenderEventProcessorJUnitTest.java | 69 +++++++++++++++++++++-
.../wan/parallel/ParallelGatewaySenderHelper.java | 8 ++-
4 files changed, 142 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 87d3021..058a1b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -772,6 +772,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
List<GatewaySenderEventImpl> conflatedEvents = null;
// Conflate the batch if necessary
if (this.sender.isBatchConflationEnabled() && events.size() > 1) {
+ if (logger.isDebugEnabled()) {
+ logEvents("original", events);
+ }
Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap =
new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>();
conflatedEvents = new ArrayList<GatewaySenderEventImpl>();
@@ -783,12 +786,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
gsEvent.getKeyToConflate(), gsEvent.getOperation());
- // Attempt to remove the key. If the entry is removed, that means a
- // duplicate key was found. If not, this is a no-op.
- conflatedEventsMap.remove(key);
+ // Get the entry at that key
+ GatewaySenderEventImpl existingEvent = conflatedEventsMap.get(key);
+ if (!gsEvent.equals(existingEvent)) {
+ // Attempt to remove the key. If the entry is removed, that means a
+ // duplicate key was found. If not, this is a no-op.
+ conflatedEventsMap.remove(key);
- // Add the key to the end of the map.
- conflatedEventsMap.put(key, gsEvent);
+ // Add the key to the end of the map.
+ conflatedEventsMap.put(key, gsEvent);
+ }
} else {
// The event should not be conflated (create or destroy). Add it to
// the map.
@@ -806,12 +813,25 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
// Increment the events conflated from batches statistic
this.sender.getStatistics()
.incEventsConflatedFromBatches(events.size() - conflatedEvents.size());
+ if (logger.isDebugEnabled()) {
+ logEvents("conflated", conflatedEvents);
+ }
} else {
conflatedEvents = events;
}
return conflatedEvents;
}
+ private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("The batch contains the following ").append(events.size()).append(" ")
+ .append(message).append(" events:");
+ for (GatewaySenderEventImpl event : events) {
+ builder.append("\t\n").append(event.toSmallString());
+ }
+ logger.debug(builder);
+ }
+
private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 8db48b4..1247d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -19,6 +19,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
@@ -44,6 +45,7 @@ import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.WrappedCallbackArgument;
import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.lang.ObjectUtils;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.ReferenceCountHelper;
import org.apache.geode.internal.offheap.Releasable;
@@ -774,6 +776,15 @@ public class GatewaySenderEventImpl
return buffer.toString();
}
+ public String toSmallString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("GatewaySenderEventImpl[").append("id=").append(this.id).append(";operation=")
+ .append(getOperation()).append(";region=").append(this.regionPath).append(";key=")
+ .append(this.key).append(";shadowKey=").append(this.shadowKey).append(";bucketId=")
+ .append(this.bucketId).append("]");
+ return buffer.toString();
+ }
+
public static boolean isSerializingValue() {
return ((Boolean) isSerializingValue.get()).booleanValue();
}
@@ -1179,6 +1190,38 @@ public class GatewaySenderEventImpl
return this.shadowKey;
}
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || !(obj instanceof GatewaySenderEventImpl)) {
+ return false;
+ }
+
+ GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj;
+
+ return this.shadowKey.equals(that.shadowKey)
+ && this.id.equals(that.id)
+ && this.bucketId == that.bucketId
+ && this.action == that.action
+ && this.regionPath.equals(that.regionPath)
+ && this.key.equals(that.key)
+ && Arrays.equals(this.value, that.value);
+ }
+
+ public int hashCode() {
+ int hashCode = 17;
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.shadowKey);
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.id);
+ hashCode = 37 * hashCode + this.bucketId;
+ hashCode = 37 * hashCode + this.action;
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.regionPath);
+ hashCode = 37 * hashCode + ObjectUtils.hashCode(this.key);
+ hashCode = 37 * hashCode + (this.value == null ? 0 : Arrays.hashCode(this.value));
+ return hashCode;
+ }
+
@Override
public Version[] getSerializationVersions() {
return new Version[] {Version.GEODE_1_9_0};
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 558df5c..1eb9920 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -125,7 +125,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
- // Create a batch of non-conflatable events with one duplicate
+ // Create a batch of non-conflatable events with one duplicate (not including the shadowKey)
List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
LocalRegion lr = mock(LocalRegion.class);
when(lr.getFullPath()).thenReturn("/dataStoreRegion");
@@ -139,7 +139,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
"Object_13964", "Object_13964", 100, 27709));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
- "Object_14024", "Object_13964", 101, 27822));
+ "Object_14024", "Object_14024", 101, 27822));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
"Object_13964", null, 102, 27935));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
@@ -151,4 +151,69 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
// Assert no events were conflated incorrectly
assertThat(originalEvents).isEqualTo(conflatedEvents);
}
+
+ @Test
+ public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents()
+ throws Exception {
+ // This is a test for GEODE-6854.
+ // A batch containing events like below is conflated incorrectly. The conflation code should
+ // remove the duplicates from this batch, but not change their order.
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587]
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926]
+ // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700]
+ // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813]
+
+ // Create a ParallelGatewaySenderEventProcessor
+ AbstractGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+ // Create mock region
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ when(lr.getCache()).thenReturn(cache);
+ when(cache.getDistributedSystem()).thenReturn(ids);
+
+ // Create a batch of conflatable and non-conflatable events with one duplicate conflatable event
+ // and one duplicate non-conflatable event (including the shadowKey)
+ List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_6079", "Object_6079", 104, 2, 89, 16587));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "Object_6079", null, 104, 3, 89, 16700));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_7731", "Object_7731", 112, 9, 89, 16813));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_6591", "Object_6591", 112, 12, 89, 16926));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "Object_6079", null, 104, 3, 89, 16700));
+ originalEvents
+ .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE,
+ "Object_7731", "Object_7731", 112, 9, 89, 16813));
+ logEvents("original", originalEvents);
+
+ // Conflate the batch of events
+ List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+ logEvents("conflated", conflatedEvents);
+ assertThat(conflatedEvents.size()).isEqualTo(4);
+ assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0));
+ assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1));
+ assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2));
+ assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
+ }
+
+ private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("The list contains the following ").append(events.size()).append(" ")
+ .append(message).append(" events:");
+ for (GatewaySenderEventImpl event : events) {
+ builder.append("\t\n").append(event.toSmallString());
+ }
+ System.out.println(builder);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
index 53529af..e75b9cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -81,9 +81,15 @@ public class ParallelGatewaySenderHelper {
public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
Object key, Object value, long sequenceId, long shadowKey) throws Exception {
+ return createGatewaySenderEvent(lr, operation, key, value, 1l, sequenceId, 0, shadowKey);
+ }
+
+ public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+ Object key, Object value, long threadId, long sequenceId, int bucketId, long shadowKey)
+ throws Exception {
when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
- eei.setEventId(new EventID(new byte[16], 1l, sequenceId));
+ eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId));
GatewaySenderEventImpl gsei =
new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
gsei.setShadowKey(shadowKey);