You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/08/12 19:15:13 UTC
[geode] 01/01: GEODE-7066: Modified batch conflation to use event
id instead of shadow key
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-7066
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9e38830d399caeb169e90df6a42286e8455b9e56
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Fri Aug 9 10:20:37 2019 -0700
GEODE-7066: Modified batch conflation to use event id instead of shadow key
---
.../wan/AbstractGatewaySenderEventProcessor.java | 15 ++-
...rallelGatewaySenderEventProcessorJUnitTest.java | 56 +++++++-
...SerialGatewaySenderEventProcessorJUnitTest.java | 146 +++++++++++++++++++++
3 files changed, 209 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 3d5405e..241ce68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -800,7 +801,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
// The event should not be conflated (create or destroy). Add it to
// the map.
ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(),
- gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getShadowKey());
+ gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getEventId());
conflatedEventsMap.put(key, gsEvent);
}
}
@@ -1389,17 +1390,17 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
private String regionName;
- private long shadowKey;
+ private EventID eventId;
private ConflationKey(String region, Object key, Operation operation) {
- this(region, key, operation, -1);
+ this(region, key, operation, null);
}
- private ConflationKey(String region, Object key, Operation operation, long shadowKey) {
+ private ConflationKey(String region, Object key, Operation operation, EventID eventId) {
this.key = key;
this.operation = operation;
this.regionName = region;
- this.shadowKey = shadowKey;
+ this.eventId = eventId;
}
@Override
@@ -1409,7 +1410,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
result = prime * result + key.hashCode();
result = prime * result + operation.hashCode();
result = prime * result + regionName.hashCode();
- result = prime * result + Long.hashCode(this.shadowKey);
+ result = prime * result + (eventId == null ? 0 : eventId.hashCode());
return result;
}
@@ -1434,7 +1435,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
if (!this.operation.equals(that.operation)) {
return false;
}
- if (this.shadowKey != that.shadowKey) {
+ if (!Objects.equals(this.eventId, that.eventId)) {
return false;
}
return true;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
index 7b0cfb2..994ce95 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -62,6 +62,8 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
@Test
public void validateBatchConflationWithBatchContainingDuplicateConflatableEvents()
throws Exception {
+ // This tests normal batch conflation.
+
// Create a ParallelGatewaySenderEventProcessor
AbstractGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
@@ -85,7 +87,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
"Object_13964", lastUpdateValue, lastUpdateSequenceId, lastUpdateShadowKey));
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
- "Object_13964", null, 104, 28274));
+ "Object_13964", null, 105, 28274));
// Conflate the batch of events
List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
@@ -197,6 +199,58 @@ public class ParallelGatewaySenderEventProcessorJUnitTest {
assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3));
}
+ @Test
+ public void validateBatchConflationWithDuplicateNonConflatableEvents()
+ throws Exception {
+ // Duplicate non-conflatable events should not be conflated.
+ //
+ // Here is an example batch with duplicate create and destroy events on the same key from
+ // different threads:
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6072];operation=CREATE;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6073];operation=UPDATE;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6009];operation=CREATE;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6074];operation=DESTROY;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6011];operation=DESTROY;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6087];operation=CREATE;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6089];operation=DESTROY;region=/SESSIONS;key=1736],
+
+ // Create a ParallelGatewaySenderEventProcessor
+ AbstractGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+ // Create mock region
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ when(lr.getCache()).thenReturn(this.cache);
+
+ // Create a batch of conflatable events with duplicate create and destroy events on the same key
+ // from different threads
+ List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+ "6079", "6079", 6, 6072, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE,
+ "6079", "6079", 6, 6073, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+ "1736", "1736", 5, 6009, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "6079", "6079", 6, 6074, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "1736", "1736", 5, 6011, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE,
+ "1736", "1736", 6, 6087, 0, 0));
+ originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY,
+ "1736", "1736", 6, 6089, 0, 0));
+ logEvents("original", originalEvents);
+
+ // Conflate the batch of events
+ List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+ logEvents("conflated", conflatedEvents);
+
+ // Assert no conflation occurs
+ assertThat(conflatedEvents.size()).isEqualTo(7);
+ assertThat(originalEvents).isEqualTo(conflatedEvents);
+ }
+
private void logEvents(String message, List<GatewaySenderEventImpl> events) {
StringBuilder builder = new StringBuilder();
builder.append("The list contains the following ").append(events.size()).append(" ")
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
index 2376b41..7f7da97 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.Logger;
@@ -33,14 +35,19 @@ import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.fake.Fakes;
public class SerialGatewaySenderEventProcessorJUnitTest {
@@ -48,6 +55,8 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
private TestSerialGatewaySenderEventProcessor processor;
+ private GemFireCacheImpl cache;
+
private static final Logger logger = LogService.getLogger();
@Before
@@ -55,6 +64,9 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
this.sender = mock(AbstractGatewaySender.class);
this.processor =
new TestSerialGatewaySenderEventProcessor(this.sender, "ny", null);
+ this.cache = Fakes.cache();
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ when(this.cache.getDistributedSystem()).thenReturn(ids);
}
@Test
@@ -224,6 +236,140 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
assertThat(unProcessedTokens).contains("threadID=4;sequenceID=4");
}
+ @Test
+ public void validateBatchConflationWithDuplicateConflatableEvents()
+ throws Exception {
+ // This tests normal batch conflation
+
+ // Create mock region
+ List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ when(lr.getCache()).thenReturn(this.cache);
+
+ // Configure conflation
+ when(this.sender.isBatchConflationEnabled()).thenReturn(true);
+ when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+
+ // Create a batch of conflatable events with duplicate update events
+ Object lastUpdateValue = "Object_13964_5";
+ long lastUpdateSequenceId = 104;
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+ "Object_13964", "Object_13964_1", 1, 100));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_13964", "Object_13964_2", 1, 101));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_13964", "Object_13964_3", 1, 102));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_13964", "Object_13964_4", 1, 103));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+ "Object_13964", lastUpdateValue, 1, lastUpdateSequenceId));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+ "Object_13964", null, 1, 105));
+
+ // Conflate the batch of events
+ List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+
+ // Verify:
+ // - the batch contains 3 events after conflation
+ // - they are CREATE, UPDATE, and DESTROY
+ // - the UPDATE event is the correct one
+ assertThat(conflatedEvents.size()).isEqualTo(3);
+ GatewaySenderEventImpl gsei1 = conflatedEvents.get(0);
+ assertThat(gsei1.getOperation()).isEqualTo(Operation.CREATE);
+ GatewaySenderEventImpl gsei2 = conflatedEvents.get(1);
+ assertThat(gsei2.getOperation()).isEqualTo(Operation.UPDATE);
+ GatewaySenderEventImpl gsei3 = conflatedEvents.get(2);
+ assertThat(gsei3.getOperation()).isEqualTo(Operation.DESTROY);
+ assertThat(gsei2.getDeserializedValue()).isEqualTo(lastUpdateValue);
+ assertThat(gsei2.getEventId().getSequenceID()).isEqualTo(lastUpdateSequenceId);
+ }
+
+ @Test
+ public void validateBatchConflationWithDuplicateNonConflatableEvents()
+ throws Exception {
+ // Duplicate non-conflatable events should not be conflated.
+ //
+ // Here is an example batch with duplicate create and destroy events on the same key from
+ // different threads:
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6072];operation=CREATE;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6073];operation=UPDATE;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6009];operation=CREATE;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6074];operation=DESTROY;region=/SESSIONS;key=6079],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|5;sequenceID=6011];operation=DESTROY;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6087];operation=CREATE;region=/SESSIONS;key=1736],
+ // GatewaySenderEventImpl[id=EventID[id=31bytes;threadID=0x30004|6;sequenceID=6089];operation=DESTROY;region=/SESSIONS;key=1736],
+
+ // Create mock region
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+ when(lr.getCache()).thenReturn(this.cache);
+
+ // Configure conflation
+ when(this.sender.isBatchConflationEnabled()).thenReturn(true);
+ when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+
+ // Create a batch of conflatable events with duplicate create and destroy events on the same key
+ // from different threads
+ List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+ "6079", "6079", 6, 6072));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.UPDATE,
+ "6079", "6079", 6, 6073));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+ "1736", "1736", 5, 6009));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+ "6079", "6079", 6, 6074));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+ "1736", "1736", 5, 6011));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.CREATE,
+ "1736", "1736", 6, 6087));
+ originalEvents.add(createGatewaySenderEvent(lr, Operation.DESTROY,
+ "1736", "1736", 6, 6089));
+ logEvents("original", originalEvents);
+
+ // Conflate the batch of event
+ List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents);
+ logEvents("conflated", conflatedEvents);
+
+ // Assert no conflation occurs
+ assertThat(conflatedEvents.size()).isEqualTo(7);
+ assertThat(originalEvents).isEqualTo(conflatedEvents);
+ }
+
+ private void logEvents(String message, List<GatewaySenderEventImpl> events) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("The list contains the following ").append(events.size()).append(" ")
+ .append(message).append(" events:");
+ for (GatewaySenderEventImpl event : events) {
+ builder.append("\t\n").append(event.toSmallString());
+ }
+ System.out.println(builder);
+ }
+
+ private GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation,
+ Object key, Object value, long threadId, long sequenceId)
+ throws Exception {
+ when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null));
+ EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
+ eei.setEventId(new EventID(new byte[16], threadId, sequenceId));
+ GatewaySenderEventImpl gsei =
+ new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true);
+ return gsei;
+ }
+
+ private EnumListenerEvent getEnumListenerEvent(Operation operation) {
+ EnumListenerEvent ele = null;
+ if (operation.isCreate()) {
+ ele = EnumListenerEvent.AFTER_CREATE;
+ } else if (operation.isUpdate()) {
+ ele = EnumListenerEvent.AFTER_UPDATE;
+ } else if (operation.isDestroy()) {
+ ele = EnumListenerEvent.AFTER_DESTROY;
+ }
+ return ele;
+ }
+
private EventID handlePrimaryEvent() {
GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class);
EventID id = mock(EventID.class);