You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/05/12 01:04:56 UTC
[geode] branch develop updated: GEODE-5087: send a
BatchDestroyOperation for each dropped event at serial primary sender
(#1924)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5c37893 GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender (#1924)
5c37893 is described below
commit 5c378931c672d695f168d2aca0848664cb4c2f2f
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri May 11 18:04:51 2018 -0700
GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender (#1924)
---
.../internal/ParallelAsyncEventQueueImpl.java | 2 +-
.../internal/SerialAsyncEventQueueImpl.java | 2 +-
.../internal/cache/wan/AbstractGatewaySender.java | 68 +++++++++++++---------
.../wan/AbstractGatewaySenderEventProcessor.java | 33 +----------
...currentParallelGatewaySenderEventProcessor.java | 32 ++++++++++
.../ParallelGatewaySenderEventProcessor.java | 6 ++
.../cache/wan/serial/BatchDestroyOperation.java | 28 ++++++++-
...oncurrentSerialGatewaySenderEventProcessor.java | 64 ++++++++++++--------
.../serial/SerialGatewaySenderEventProcessor.java | 47 +++++++++++++--
.../xmlcache/ParallelAsyncEventQueueCreation.java | 2 +-
.../xmlcache/ParallelGatewaySenderCreation.java | 2 +-
.../xmlcache/SerialAsyncEventQueueCreation.java | 2 +-
.../xmlcache/SerialGatewaySenderCreation.java | 2 +-
.../wan/parallel/ParallelGatewaySenderImpl.java | 2 +-
.../cache/wan/serial/SerialGatewaySenderImpl.java | 6 +-
.../SerialGatewaySenderOperationsDUnitTest.java | 2 -
16 files changed, 199 insertions(+), 101 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 538b65a..8e2e4e4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -168,7 +168,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {
int bucketId = -1;
// merged from 42004
if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 9e0239d..400126d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -225,7 +225,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
* internal.cache.EntryEventImpl)
*/
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {
EventID originalEventId = clonedEvent.getEventId();
long originalThreadId = originalEventId.getThreadID();
long newThreadId = originalThreadId;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 123534a..149fa48 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -173,6 +173,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents =
new ConcurrentLinkedQueue<>();
+
+ protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
+ new ConcurrentLinkedQueue<>();
/**
* The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds.
*/
@@ -836,40 +839,43 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
final boolean isDebugEnabled = logger.isDebugEnabled();
- // If this gateway is not running, return
- if (!isRunning()) {
- if (isDebugEnabled) {
- logger.debug("Returning back without putting into the gateway sender queue:" + event);
- }
- if (this.eventProcessor != null) {
- this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
- }
- return;
- }
-
- final GatewaySenderStats stats = getStatistics();
- stats.incEventsReceived();
-
- if (!checkForDistribution(event, stats)) {
- stats.incEventsNotQueued();
- return;
- }
-
- // this filter is defined by Asif which exist in old wan too. new wan has
- // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
- // not considering this filter
- if (!this.filter.enqueueEvent(event)) {
- stats.incEventsFiltered();
- return;
- }
// released by this method or transfers ownership to TmpQueueEvent
@Released
EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
boolean freeClonedEvent = true;
try {
- Region region = event.getRegion();
+ // If this gateway is not running, return
+ if (!isRunning()) {
+ if (this.isPrimary()) {
+ tmpDroppedEvents.add(clonedEvent);
+ if (isDebugEnabled) {
+ logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent);
+ }
+ }
+ if (isDebugEnabled) {
+ logger.debug("Returning back without putting into the gateway sender queue:" + event);
+ }
+ return;
+ }
+
+ final GatewaySenderStats stats = getStatistics();
+ stats.incEventsReceived();
+
+ if (!checkForDistribution(event, stats)) {
+ stats.incEventsNotQueued();
+ return;
+ }
+ // this filter is defined by Asif which exist in old wan too. new wan has
+ // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
+ // not considering this filter
+ if (!this.filter.enqueueEvent(event)) {
+ stats.incEventsFiltered();
+ return;
+ }
+
+ // start to distribute
setModifiedEventId(clonedEvent);
Object callbackArg = clonedEvent.getRawCallbackArgument();
@@ -1016,6 +1022,12 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
*/
public void enqueueTempEvents() {
if (this.eventProcessor != null) {// Fix for defect #47308
+ // process tmpDroppedEvents
+ EntryEventImpl droppedEvent = null;
+ while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
+ this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
+ }
+
TmpQueueEvent nextEvent = null;
final GatewaySenderStats stats = getStatistics();
try {
@@ -1216,7 +1228,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return region;
}
- protected abstract void setModifiedEventId(EntryEventImpl clonedEvent);
+ public abstract void setModifiedEventId(EntryEventImpl clonedEvent);
public static class DefaultGatewayEventFilter
implements org.apache.geode.internal.cache.GatewayEventFilter {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 2ce06c6..89fa586 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -279,36 +277,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
return this.queue.size();
}
- public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
- if (queue == null) {
- return;
- }
- if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
- ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
- PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
- if (prQ == null) {
- if (logger.isDebugEnabled()) {
- logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
- + " is not created yet.");
- }
- return;
- }
- int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
- long shadowKey = event.getTailKey();
-
- ParallelGatewaySenderQueue pgsq =
- (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
- boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
- if (isPrimary) {
- pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
- this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
- if (logger.isDebugEnabled()) {
- logger.debug("register dropped event for primary queue. BucketId is " + bucketId
- + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
- }
- }
- }
- }
+ protected abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent);
/**
* @return the sender
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 54b7034..6b8cce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -32,12 +32,15 @@ import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -138,6 +141,35 @@ public class ConcurrentParallelGatewaySenderEventProcessor
}
@Override
+ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+ if (queue == null) {
+ return;
+ }
+ ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+ PartitionedRegion prQ = cpgsq.getRegion(droppedEvent.getRegion().getFullPath());
+ if (prQ == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath()
+ + " is not created yet.");
+ }
+ return;
+ }
+ int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) droppedEvent);
+ long shadowKey = droppedEvent.getTailKey();
+
+ ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+ boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+ if (isPrimary) {
+ pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+ this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
+ if (logger.isDebugEnabled()) {
+ logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+ + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+ }
+ }
+ }
+
+ @Override
public void run() {
final boolean isDebugEnabled = logger.isDebugEnabled();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 5715a35..77811c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -149,6 +149,12 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
}
}
+ @Override
+ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+ logger.info("ParallelGatewaySenderEventProcessor should not process dropped event {}",
+ droppedEvent);
+ }
+
public void clear(PartitionedRegion pr, int bucketId) {
((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index debb005..d9dde9d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -102,7 +102,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
}
// Optimized way
- for (long k = (Long) this.key; k <= this.tailKey; k++) {
+ for (long k = (Long) this.key; k <= this.tailKey && this.tailKey != -1; k++) {
try {
for (GatewayEventFilter filter : rgn.getSerialGatewaySender()
.getGatewayEventFilters()) {
@@ -124,6 +124,32 @@ public class BatchDestroyOperation extends DistributedCacheOperation {
}
}
}
+
+ // destroy dropped event from unprocessedKeys
+ if (this.tailKey == -1) {
+ SerialGatewaySenderEventProcessor ep = null;
+ int index = ((Long) this.key).intValue();
+ if (index == -1) {
+ // this is SerialGatewaySenderEventProcessor
+ ep = (SerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+ .getEventProcessor();
+ } else {
+ ConcurrentSerialGatewaySenderEventProcessor csgep =
+ (ConcurrentSerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender()
+ .getEventProcessor();
+ ep = csgep.processors.get(index);
+ }
+ if (ep != null) {
+ // if sender is being shutdown, the ep could be null
+ boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId());
+ if (removed) {
+ if (isDebugEnabled) {
+ logger.debug("Removed a dropped event {} from unprocessedEvents.",
+ (EntryEventImpl) event);
+ }
+ }
+ }
+ }
this.appliedOperation = true;
} catch (CacheWriterException e) {
throw new Error(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ec01fd9..8ec6ce1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -109,6 +109,35 @@ public class ConcurrentSerialGatewaySenderEventProcessor
}
+ public void setModifiedEventId(EntryEventImpl clonedEvent, int index) {
+ EventID originalEventId = clonedEvent.getEventId();
+ if (logger.isDebugEnabled()) {
+ logger.debug("The original EventId is {}", originalEventId);
+ }
+ // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
+ // generating threadId by the algorithm explained above used to clash with
+ // fakeThreadId generated by putAll
+ // below is new way to generate threadId so that it doesn't clash with
+ // any.
+ long newThreadId =
+ ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(),
+ 0 /*
+ * gateway sender event id index has already been applied in
+ * SerialGatewaySenderImpl.setModifiedEventId
+ */);
+ EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
+ originalEventId.getSequenceID());
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}"
+ + ":index=" + this.sender.getEventIdIndex(),
+ this, clonedEvent.getKey(), index, originalEventId,
+ ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId,
+ ThreadIdentifier.toDisplayString(newThreadId));
+ }
+ clonedEvent.setEventId(newEventId);
+ }
+
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
int index) throws CacheException, IOException {
// Get the appropriate gateway
@@ -121,30 +150,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
@Released
EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event);
try {
- EventID originalEventId = clonedEvent.getEventId();
- if (logger.isDebugEnabled()) {
- logger.debug("The original EventId is {}", originalEventId);
- }
- // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
- // generating threadId by the algorithm explained above used to clash with
- // fakeThreadId generated by putAll
- // below is new way to generate threadId so that it doesn't clash with
- // any.
- long newThreadId = ThreadIdentifier.createFakeThreadIDForParallelGateway(index,
- originalEventId.getThreadID(),
- 0 /*
- * gateway sender event id index has already been applied in
- * SerialGatewaySenderImpl.setModifiedEventId
- */);
- EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId,
- originalEventId.getSequenceID());
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
- this, event.getKey(), index, originalEventId, originalEventId.getThreadID(),
- newEventId, newThreadId);
- }
- clonedEvent.setEventId(newEventId);
+ setModifiedEventId(clonedEvent, index);
serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue);
} finally {
clonedEvent.release();
@@ -375,6 +381,16 @@ public class ConcurrentSerialGatewaySenderEventProcessor
}
@Override
+ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+ this.getSender().setModifiedEventId(droppedEvent);
+ // modified event again for concurrent SGSEP
+ int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % this.processors.size());
+ setModifiedEventId(droppedEvent, index);
+
+ this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent, index);
+ }
+
+ @Override
protected void enqueueEvent(GatewayQueueEvent event) {
for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
serialProcessor.enqueueEvent(event);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 3fa4d6a..39609c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -610,7 +611,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
}
my_executor.execute(new Runnable() {
public void run() {
- basicHandlePrimaryDestroy(gatewayEvent);
+ basicHandlePrimaryDestroy(gatewayEvent.getEventId());
}
});
}
@@ -620,23 +621,25 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
* Just remove the event from the unprocessed events map if it is present. This method added to
* fix bug 37603
*/
- protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) {
+ protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
if (this.sender.isPrimary()) {
// no need to do anything if we have become the primary
- return;
+ return false;
}
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
synchronized (unprocessedEventsLock) {
if (this.unprocessedEvents == null)
- return;
+ return false;
// now we can safely use the unprocessedEvents field
- EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
+ EventWrapper ew = this.unprocessedEvents.remove(eventId);
if (ew != null) {
ew.event.release();
statistics.incUnprocessedEventsRemovedByPrimary();
+ return true;
}
}
+ return false;
}
protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) {
@@ -865,4 +868,38 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
// @TODO This API hasn't been implemented yet
throw new UnsupportedOperationException();
}
+
+ public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl dropEvent, int index) {
+ EntryEventImpl destroyEvent =
+ EntryEventImpl.create((LocalRegion) this.queue.getRegion(), Operation.DESTROY, (long) index,
+ null/* newValue */, null, false, sender.getCache().getMyId());
+ destroyEvent.setEventId(dropEvent.getEventId());
+ destroyEvent.disallowOffHeapValues();
+ destroyEvent.setTailKey(-1L);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}",
+ destroyEvent);
+ }
+
+ try {
+ BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent);
+ op.distribute();
+ if (logger.isDebugEnabled()) {
+ logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent);
+ }
+ } catch (Exception ignore) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Exception in sending dropped event could be ignored in order not to interrupt sender starting",
+ ignore);
+ }
+ }
+ }
+
+ @Override
+ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
+ this.getSender().setModifiedEventId(droppedEvent);
+ sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index 6f8efa8..4686b67 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -87,5 +87,5 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 5b025b5..257ee75 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -90,7 +90,7 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
throw new UnsupportedOperationException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index ce71c54..cd06661 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -86,5 +86,5 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender impleme
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 80c04de..b0766ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -87,7 +87,7 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
throw new UnsupportedOperationException();
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index d023704..f565426 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -167,7 +167,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {
int bucketId = -1;
// merged from 42004
if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d964253..ecca896 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -211,7 +211,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
}
@Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {
EventID originalEventId = clonedEvent.getEventId();
long originalThreadId = originalEventId.getThreadID();
long newThreadId = originalThreadId;
@@ -226,7 +226,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}",
- this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+ this, clonedEvent.getKey(), originalEventId,
+ ThreadIdentifier.toDisplayString(originalThreadId), newEventId,
+ ThreadIdentifier.toDisplayString(newThreadId));
}
clonedEvent.setEventId(newEventId);
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index caa357e..4993f24 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.WanTest;
@Category({DistributedTest.class, WanTest.class})
@@ -266,7 +265,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
}
- @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
@Test
public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.