You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/09/25 18:00:16 UTC
[geode] branch develop updated: GEODE-3678: Added support to
synchronize parallel gateway sender queues
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new af4012f GEODE-3678: Added support to synchronize parallel gateway sender queues
af4012f is described below
commit af4012f5f934c7e8f21025c528d8ba18e3aad794
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu Sep 21 18:25:30 2017 -0700
GEODE-3678: Added support to synchronize parallel gateway sender queues
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 8 +-
.../org/apache/geode/internal/DSFIDFactory.java | 5 +
.../geode/internal/DataSerializableFixedID.java | 2 +
.../geode/internal/cache/GemFireCacheImpl.java | 29 ++
.../internal/cache/InitialImageOperation.java | 20 ++
.../apache/geode/internal/cache/InternalCache.java | 4 +
.../cache/RegionEntrySynchronizationListener.java | 25 ++
.../internal/cache/wan/AbstractGatewaySender.java | 38 +++
.../wan/AbstractGatewaySenderEventProcessor.java | 2 +
.../internal/cache/wan/GatewaySenderEventImpl.java | 2 +-
...waySenderQueueEntrySynchronizationListener.java | 35 +++
...aySenderQueueEntrySynchronizationOperation.java | 314 +++++++++++++++++++++
.../internal/cache/wan/GatewaySenderStats.java | 29 +-
...currentParallelGatewaySenderEventProcessor.java | 8 +
.../ParallelGatewaySenderEventProcessor.java | 22 +-
...oncurrentSerialGatewaySenderEventProcessor.java | 7 +
.../serial/SerialGatewaySenderEventProcessor.java | 6 +
.../internal/cache/xmlcache/CacheCreation.java | 8 +
.../cache/xmlcache/ClientCacheCreation.java | 10 +
.../xmlcache/ParallelGatewaySenderCreation.java | 9 +
.../xmlcache/SerialGatewaySenderCreation.java | 9 +
.../geode/internal/i18n/LocalizedStrings.java | 14 +
...yncEventQueueEntrySynchronizationDUnitTest.java | 183 ++++++++++++
.../cache/wan/AsyncEventQueueTestBase.java | 48 ++--
.../cache/wan/WaitingAsyncEventListener.java | 63 +++++
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +
.../geode/internal/cache/wan/WANTestBase.java | 2 +-
27 files changed, 875 insertions(+), 35 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 4c5f937..4eb9d0d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -94,7 +94,11 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createIntGauge(LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
"operations"),
f.createLongCounter(LOAD_BALANCE_TIME, "Total time spent load balancing this sender",
- "nanoseconds"),});
+ "nanoseconds"),
+ f.createIntCounter(SYNCHRONIZATION_EVENTS_ENQUEUED,
+ "Number of synchronization events added to the event queue.", "operations"),
+ f.createIntCounter(SYNCHRONIZATION_EVENTS_PROVIDED,
+ "Number of synchronization events provided to other members.", "operations"),});
// Initialize id fields
eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
@@ -123,6 +127,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+ synchronizationEventsEnqueuedId = type.nameToId(SYNCHRONIZATION_EVENTS_ENQUEUED);
+ synchronizationEventsProvidedId = type.nameToId(SYNCHRONIZATION_EVENTS_PROVIDED);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 5b0d86b..31887ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -397,6 +397,7 @@ import org.apache.geode.internal.cache.versions.VMVersionTag;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.BatchRemovalReplyMessage;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
@@ -926,6 +927,10 @@ public class DSFIDFactory implements DataSerializableFixedID {
registerDSFID(PR_DESTROY_ON_DATA_STORE_MESSAGE, DestroyRegionOnDataStoreMessage.class);
registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST, ShutdownAllGatewayHubsRequest.class);
registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
+ registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE,
+ GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationMessage.class);
+ registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
+ GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index 18f382b..b4fbac7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -812,6 +812,8 @@ public interface DataSerializableFixedID extends SerializationVersions {
public static final short DESTROY_LUCENE_INDEX_MESSAGE = 2178;
public static final short LUCENE_PAGE_RESULTS = 2179;
public static final short LUCENE_RESULT_STRUCT = 2180;
+ public static final short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
+ public static final short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
// NOTE, codes > 65535 will take 4 bytes to serialize
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index abc62f9..c5b5d01 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -80,6 +80,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
+import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
@@ -589,6 +590,9 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
private final SecurityService securityService;
+ private final Set<RegionEntrySynchronizationListener> synchronizationListeners =
+ new ConcurrentHashSet<>();
+
static {
// this works around jdk bug 6427854, reported in ticket #44434
String propertyName = "sun.nio.ch.bugLevel";
@@ -943,6 +947,8 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
SystemFailure.signalCacheCreate();
this.diskMonitor = new DiskStoreMonitor();
+
+ addRegionEntrySynchronizationListener(new GatewaySenderQueueEntrySynchronizationListener());
} // synchronized
}
@@ -5232,4 +5238,27 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
public CqService getCqService() {
return this.cqService;
}
+
+ public void addRegionEntrySynchronizationListener(RegionEntrySynchronizationListener listener) {
+ this.synchronizationListeners.add(listener);
+ }
+
+ public void removeRegionEntrySynchronizationListener(
+ RegionEntrySynchronizationListener listener) {
+ this.synchronizationListeners.remove(listener);
+ }
+
+ public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+ InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize) {
+ for (RegionEntrySynchronizationListener listener : this.synchronizationListeners) {
+ try {
+ listener.afterSynchronization(sender, region, entriesToSynchronize);
+ } catch (Throwable t) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.GemFireCacheImpl_CAUGHT_EXCEPTION_SYNCHRONIZING_EVENTS,
+ new Object[] {sender, region.getFullPath(), entriesToSynchronize}), t);
+ }
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index b48fdc5..38a76de 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -784,6 +784,10 @@ public class InitialImageOperation {
keys = new HashSet();
}
final ByteArrayDataInput in = new ByteArrayDataInput();
+ List<Entry> entriesToSynchronize = null;
+ if (this.isSynchronizing) {
+ entriesToSynchronize = new ArrayList<>();
+ }
for (int i = 0; i < entryCount; i++) {
// stream is null-terminated
if (internalDuringApplyDelta != null && !internalDuringApplyDelta.isRunning
@@ -884,6 +888,9 @@ public class InitialImageOperation {
if (record) {
this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered,
true, tag, sender, this.isSynchronizing);
+ if (this.isSynchronizing) {
+ entriesToSynchronize.add(entry);
+ }
}
} catch (RegionDestroyedException e) {
return false;
@@ -927,6 +934,9 @@ public class InitialImageOperation {
}
this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered, false,
tag, sender, this.isSynchronizing);
+ if (this.isSynchronizing) {
+ entriesToSynchronize.add(entry);
+ }
} catch (RegionDestroyedException e) {
return false;
} catch (CancelException e) {
@@ -935,6 +945,12 @@ public class InitialImageOperation {
didIIP = true;
}
}
+ if (this.isSynchronizing && !entriesToSynchronize.isEmpty()) {
+ LocalRegion owner = ((AbstractRegionMap) this.entries)._getOwner();
+ LocalRegion region = owner instanceof BucketRegion ? owner.getPartitionedRegion() : owner;
+ owner.getCache().invokeRegionEntrySynchronizationListenersAfterSynchronization(sender,
+ region, entriesToSynchronize);
+ }
if (keys != null) {
if (isDebugEnabled) {
logger.debug("processed these initial image keys: {}", keys);
@@ -2929,6 +2945,10 @@ public class InitialImageOperation {
this.entryBits = EntryBits.setTombstone(this.entryBits, true);
}
+ public Object getKey() {
+ return key;
+ }
+
public VersionTag getVersionTag() {
return versionTag;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 544406f..66bc44a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -316,4 +316,8 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
boolean hasPersistentRegion();
void shutDownAll();
+
+ void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+ InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java
new file mode 100644
index 0000000..aad20c4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntrySynchronizationListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.List;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public interface RegionEntrySynchronizationListener {
+
+ void afterSynchronization(InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize);
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 9b3b61f..fbf8c71 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1350,4 +1350,42 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
this.event.release();
}
}
+
+ protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+ GatewayQueueEvent event = null;
+ for (RegionQueue queue : getQueues()) {
+ Region region = queue.getRegion();
+ for (Iterator i = region.values().iterator(); i.hasNext();) {
+ GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) i.next();
+ if (gsei.getKey().equals(key) && gsei.getVersionTimeStamp() == timestamp) {
+ event = gsei;
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.AbstractGatewaySender_PROVIDING_SYNCHRONIZATION_EVENT,
+ new Object[] {this, key, timestamp, event}));
+ this.statistics.incSynchronizationEventsProvided();
+ break;
+ }
+ }
+ }
+ return event;
+ }
+
+ protected void putSynchronizationEvent(GatewayQueueEvent event) {
+ if (this.eventProcessor != null) {
+ this.lifeCycleLock.readLock().lock();
+ try {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.AbstractGatewaySender_ENQUEUEING_SYNCHRONIZATION_EVENT,
+ new Object[] {this, event}));
+ this.eventProcessor.enqueueEvent(event);
+ this.statistics.incSynchronizationEventsEnqueued();
+ } catch (Throwable t) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ENQUEUEING_SYNCHRONIZATION_EVENT,
+ new Object[] {this, event}), t);
+ } finally {
+ this.lifeCycleLock.readLock().unlock();
+ }
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0c93755..52df2c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1291,6 +1291,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(userRegion);
}
+ protected abstract void enqueueEvent(GatewayQueueEvent event);
+
protected class SenderStopperCallable implements Callable<Boolean> {
private final AbstractGatewaySenderEventProcessor p;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index cacd326..ec89ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -781,7 +781,7 @@ public class GatewaySenderEventImpl
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
- .append("]");
+ .append(";bucketId=").append(this.bucketId).append("]");
return buffer.toString();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java
new file mode 100644
index 0000000..2c898d2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.util.List;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntrySynchronizationListener;
+
+public class GatewaySenderQueueEntrySynchronizationListener
+ implements RegionEntrySynchronizationListener {
+
+ @Override
+ public void afterSynchronization(InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize) {
+ if (region.getAllGatewaySenderIds().size() > 0) {
+ new GatewaySenderQueueEntrySynchronizationOperation(sender, region, entriesToSynchronize)
+ .synchronizeEntries();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
new file mode 100644
index 0000000..1c398a9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+
+public class GatewaySenderQueueEntrySynchronizationOperation {
+
+ private InternalDistributedMember recipient;
+
+ private LocalRegion region;
+
+ private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;
+
+ private static final Logger logger = LogService.getLogger();
+
+ protected GatewaySenderQueueEntrySynchronizationOperation(InternalDistributedMember recipient,
+ LocalRegion region, List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
+ this.recipient = recipient;
+ this.region = region;
+ initializeEntriesToSynchronize(giiEntriesToSynchronize);
+ }
+
+ protected void synchronizeEntries() {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Requesting synchronization from member={}; regionPath={}; entriesToSynchronize={}",
+ getClass().getSimpleName(), this.recipient, this.region.getFullPath(),
+ this.entriesToSynchronize);
+ }
+ // Create and send message
+ DM dm = this.region.getDistributionManager();
+ GatewaySenderQueueEntrySynchronizationReplyProcessor processor =
+ new GatewaySenderQueueEntrySynchronizationReplyProcessor(dm, this.recipient, this);
+ GatewaySenderQueueEntrySynchronizationMessage message =
+ new GatewaySenderQueueEntrySynchronizationMessage(this.recipient,
+ processor.getProcessorId(), this);
+ dm.putOutgoing(message);
+
+ // Wait for replies
+ try {
+ processor.waitForReplies();
+ } catch (ReplyException e) {
+ e.handleAsUnexpected();
+ } catch (InterruptedException e) {
+ dm.getCancelCriterion().checkCancelInProgress(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ protected GemFireCacheImpl getCache() {
+ return (GemFireCacheImpl) CacheFactory.getAnyInstance();
+ }
+
+ private void initializeEntriesToSynchronize(
+ List<InitialImageOperation.Entry> giiEntriesToSynchronize) {
+ this.entriesToSynchronize = new ArrayList<>();
+ for (InitialImageOperation.Entry entry : giiEntriesToSynchronize) {
+ this.entriesToSynchronize.add(
+ new GatewaySenderQueueEntrySynchronizationEntry(entry.getKey(), entry.getVersionTag()));
+ }
+ }
+
+ public static class GatewaySenderQueueEntrySynchronizationReplyProcessor
+ extends ReplyProcessor21 {
+
+ private GatewaySenderQueueEntrySynchronizationOperation operation;
+
+ public GatewaySenderQueueEntrySynchronizationReplyProcessor(DM dm,
+ InternalDistributedMember recipient,
+ GatewaySenderQueueEntrySynchronizationOperation operation) {
+ super(dm, recipient);
+ this.operation = operation;
+ }
+
+ @Override
+ public void process(DistributionMessage msg) {
+ try {
+ if (msg instanceof ReplyMessage) {
+ ReplyMessage reply = (ReplyMessage) msg;
+ if (reply.getException() == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Processing reply from member={}; regionPath={}; key={}; entriesToSynchronize={}",
+ getClass().getSimpleName(), reply.getSender(),
+ this.operation.region.getFullPath(), this.operation.entriesToSynchronize,
+ reply.getReturnValue());
+ }
+ List<Map<String, GatewayQueueEvent>> events =
+ (List<Map<String, GatewayQueueEvent>>) reply.getReturnValue();
+ for (int i = 0; i < events.size(); i++) {
+ Map<String, GatewayQueueEvent> eventsForOneEntry = events.get(i);
+ if (events.isEmpty()) {
+ GatewaySenderQueueEntrySynchronizationEntry entry =
+ this.operation.entriesToSynchronize.get(i);
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.GatewaySenderQueueEntrySynchronizationReplyProcessor_REPLY_IS_EMPTY,
+ new Object[] {reply.getSender(), this.operation.region.getFullPath(), entry.key,
+ entry.entryVersion}));
+ } else {
+ putSynchronizationEvents(eventsForOneEntry);
+ }
+ }
+ }
+ }
+ } finally {
+ super.process(msg);
+ }
+ }
+
+ private void putSynchronizationEvents(Map<String, GatewayQueueEvent> senderIdsAndEvents) {
+ for (Map.Entry<String, GatewayQueueEvent> senderIdAndEvent : senderIdsAndEvents.entrySet()) {
+ AbstractGatewaySender sender =
+ (AbstractGatewaySender) getCache().getGatewaySender(senderIdAndEvent.getKey());
+ sender.putSynchronizationEvent(senderIdAndEvent.getValue());
+ }
+ }
+
+ private Cache getCache() {
+ return CacheFactory.getAnyInstance();
+ }
+ }
+
+ public static class GatewaySenderQueueEntrySynchronizationMessage
+ extends PooledDistributionMessage implements MessageWithReply {
+
+ private int processorId;
+
+ private String regionPath;
+
+ private List<GatewaySenderQueueEntrySynchronizationEntry> entriesToSynchronize;
+
+ /* For serialization */
+ public GatewaySenderQueueEntrySynchronizationMessage() {}
+
+ protected GatewaySenderQueueEntrySynchronizationMessage(InternalDistributedMember recipient,
+ int processorId, GatewaySenderQueueEntrySynchronizationOperation operation) {
+ super();
+ setRecipient(recipient);
+ this.processorId = processorId;
+ this.regionPath = operation.region.getFullPath();
+ this.entriesToSynchronize = operation.entriesToSynchronize;
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ Object result = null;
+ ReplyException replyException = null;
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}",
+ getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize);
+ }
+ result = getSynchronizationEvents();
+ } catch (Throwable t) {
+ replyException = new ReplyException(t);
+ } finally {
+ ReplyMessage replyMsg = new ReplyMessage();
+ replyMsg.setRecipient(getSender());
+ replyMsg.setProcessorId(this.processorId);
+ if (replyException == null) {
+ replyMsg.setReturnValue(result);
+ } else {
+ replyMsg.setException(replyException);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending synchronization reply returnValue={}; exception={}",
+ getClass().getSimpleName(), replyMsg.getReturnValue(), replyMsg.getException());
+ }
+ dm.putOutgoing(replyMsg);
+ }
+ }
+
+ private Object getSynchronizationEvents() {
+ List<Map<String, GatewayQueueEvent>> results = new ArrayList<>();
+ // Get the region
+ GemFireCacheImpl gfci = (GemFireCacheImpl) getCache();
+ LocalRegion region = (LocalRegion) gfci.getRegion(this.regionPath);
+
+ // Add the appropriate GatewaySenderEventImpl from each GatewaySender for each entry
+ Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds();
+ for (GatewaySender sender : gfci.getAllGatewaySenders()) {
+ if (allGatewaySenderIds.contains(sender.getId())) {
+ for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) {
+ Map<String, GatewayQueueEvent> resultForOneEntry = new HashMap<>();
+ GatewayQueueEvent event = ((AbstractGatewaySender) sender)
+ .getSynchronizationEvent(entry.key, entry.entryVersion.getVersionTimeStamp());
+ if (event != null) {
+ resultForOneEntry.put(sender.getId(), event);
+ }
+ results.add(resultForOneEntry);
+ }
+ }
+ }
+
+ return results;
+ }
+
+ private Cache getCache() {
+ return CacheFactory.getAnyInstance();
+ }
+
+ @Override
+ public int getDSFID() {
+ return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ out.writeInt(this.processorId);
+ DataSerializer.writeString(this.regionPath, out);
+ DataSerializer.writeArrayList((ArrayList) this.entriesToSynchronize, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.processorId = in.readInt();
+ this.regionPath = DataSerializer.readString(in);
+ this.entriesToSynchronize = DataSerializer.readArrayList(in);
+ }
+ }
+
+ public static class GatewaySenderQueueEntrySynchronizationEntry
+ implements DataSerializableFixedID {
+
+ private Object key;
+
+ private VersionTag entryVersion;
+
+ /* For serialization */
+ public GatewaySenderQueueEntrySynchronizationEntry() {}
+
+ public GatewaySenderQueueEntrySynchronizationEntry(Object key, VersionTag entryVersion) {
+ this.key = key;
+ this.entryVersion = entryVersion;
+ }
+
+ @Override
+ public int getDSFID() {
+ return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY;
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeObject(this.key, out);
+ DataSerializer.writeObject(this.entryVersion, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.key = DataSerializer.readObject(in);
+ this.entryVersion = DataSerializer.readObject(in);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append(getClass().getSimpleName()).append("[").append("key=")
+ .append(this.key).append("; entryVersion=").append(this.entryVersion).append("]")
+ .toString();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 83720db..228e7a9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -89,6 +89,9 @@ public class GatewaySenderStats {
protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
protected static final String LOAD_BALANCE_TIME = "loadBalanceTime";
+ protected static final String SYNCHRONIZATION_EVENTS_ENQUEUED = "synchronizationEventsEnqueued";
+ protected static final String SYNCHRONIZATION_EVENTS_PROVIDED = "synchronizationEventsProvided";
+
/** Id of the events queued statistic */
protected static int eventsReceivedId;
/** Id of the events queued statistic */
@@ -140,6 +143,10 @@ public class GatewaySenderStats {
protected static int loadBalancesInProgressId;
/** Id of load balance time */
protected static int loadBalanceTimeId;
+ /** Id of synchronization events enqueued */
+ protected static int synchronizationEventsEnqueuedId;
+ /** Id of synchronization events provided */
+ protected static int synchronizationEventsProvidedId;
/**
* Static initializer to create and initialize the <code>StatisticsType</code>
@@ -213,7 +220,11 @@ public class GatewaySenderStats {
f.createIntGauge(LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
"operations"),
f.createLongCounter(LOAD_BALANCE_TIME, "Total time spent load balancing this sender",
- "nanoseconds"),});
+ "nanoseconds"),
+ f.createIntCounter(SYNCHRONIZATION_EVENTS_ENQUEUED,
+ "Number of synchronization events added to the event queue.", "operations"),
+ f.createIntCounter(SYNCHRONIZATION_EVENTS_PROVIDED,
+ "Number of synchronization events provided to other members.", "operations"),});
// Initialize id fields
eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
@@ -243,6 +254,8 @@ public class GatewaySenderStats {
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+ synchronizationEventsEnqueuedId = type.nameToId(SYNCHRONIZATION_EVENTS_ENQUEUED);
+ synchronizationEventsProvidedId = type.nameToId(SYNCHRONIZATION_EVENTS_PROVIDED);
}
////////////////////// Instance Fields //////////////////////
@@ -741,6 +754,20 @@ public class GatewaySenderStats {
stats.incLong(loadBalanceTimeId, delta);
}
+ /**
+ * Increments the number of synchronization events enqueued.
+ */
+ public void incSynchronizationEventsEnqueued() {
+ this.stats.incInt(synchronizationEventsEnqueuedId, 1);
+ }
+
+ /**
+ * Increments the number of synchronization events provided.
+ */
+ public void incSynchronizationEventsProvided() {
+ this.stats.incInt(synchronizationEventsProvidedId, 1);
+ }
+
public Statistics getStats() {
return stats;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index ed6df0b..70a2cbb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -34,6 +34,7 @@ import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.LocalRegion;
@@ -41,6 +42,7 @@ import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -325,4 +327,10 @@ public class ConcurrentParallelGatewaySenderEventProcessor
public void initializeEventDispatcher() {
// no op for AsyncEventProcessor
}
+
+ @Override
+ protected void enqueueEvent(GatewayQueueEvent event) {
+ int pId = ((GatewaySenderEventImpl) event).getBucketId() % this.nDispatcher;
+ this.processors[pId].enqueueEvent(event);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index e74270f..ff8a231 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
@@ -112,15 +113,20 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
}
// TODO: Looks like for PDX region bucket id is set to -1.
- boolean queuedEvent = false;
- try {
- EventID eventID = ((EntryEventImpl) event).getEventId();
+ EventID eventID = ((EntryEventImpl) event).getEventId();
- // while merging 42004, kept substituteValue as it is(it is barry's
- // change 42466). bucketID is merged with eventID.getBucketID
- gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true,
- eventID.getBucketID());
+ // while merging 42004, kept substituteValue as it is(it is barry's
+ // change 42466). bucketID is merged with eventID.getBucketID
+ gatewayQueueEvent =
+ new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID());
+
+ enqueueEvent(gatewayQueueEvent);
+ }
+ @Override
+ protected void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) {
+ boolean queuedEvent = false;
+ try {
if (getSender().beforeEnqueue(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
try {
@@ -138,7 +144,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
} finally {
if (!queuedEvent) {
// it was not queued for some reason
- gatewayQueueEvent.release();
+ ((GatewaySenderEventImpl) gatewayQueueEvent).release();
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 713713f..2f05d56 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -37,6 +37,7 @@ import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
@@ -386,4 +387,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor
}
+ @Override
+ protected void enqueueEvent(GatewayQueueEvent event) {
+ for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+ serialProcessor.enqueueEvent(event);
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 150b5ac..2f5da89 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -37,6 +37,7 @@ import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.Assert;
@@ -859,4 +860,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
}
+ @Override
+ protected void enqueueEvent(GatewayQueueEvent event) {
+ // @TODO This API hasn't been implemented yet
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 833fcaa..9c80ce1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -88,6 +88,7 @@ import org.apache.geode.internal.cache.DiskStoreFactoryImpl;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.DiskStoreMonitor;
import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
import org.apache.geode.internal.cache.ExpirationScheduler;
import org.apache.geode.internal.cache.FilterProfile;
@@ -2214,4 +2215,11 @@ public class CacheCreation implements InternalCache {
public void shutDownAll() {
throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
}
+
+ @Override
+ public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+ InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize) {
+ throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
index 667ae0c..fec0e7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ClientCacheCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -43,8 +44,11 @@ import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -283,4 +287,10 @@ public class ClientCacheCreation extends CacheCreation implements ClientCache {
return Collections.emptySet();
}
+ @Override
+ public void invokeRegionEntrySynchronizationListenersAfterSynchronization(
+ InternalDistributedMember sender, LocalRegion region,
+ List<InitialImageOperation.Entry> entriesToSynchronize) {
+ throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 3cd60c7..791c3f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
import java.util.List;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionAdvisee;
@@ -90,4 +91,12 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
@Override
protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+ protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void putSynchronizationEvent(GatewayQueueEvent event) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 9aff712..9fcf06a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.xmlcache;
import java.util.List;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionAdvisee;
@@ -87,4 +88,12 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
@Override
protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+ protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void putSynchronizationEvent(GatewayQueueEvent event) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 3a32db8..fd77c3e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7697,6 +7697,20 @@ public class LocalizedStrings {
public static final StringId LuceneServiceImpl_REGION_0_CANNOT_BE_DESTROYED = new StringId(6660,
"Region {0} cannot be destroyed because it defines Lucene index(es) [{1}]. Destroy all Lucene indexes before destroying the region.");
+ public static final StringId AbstractGatewaySender_CAUGHT_EXCEPTION_ENQUEUEING_SYNCHRONIZATION_EVENT =
+ new StringId(6661,
+ "{0}: Caught the following exception attempting to enqueue synchronization event={1}:");
+ public static final StringId GemFireCacheImpl_CAUGHT_EXCEPTION_SYNCHRONIZING_EVENTS =
+ new StringId(6662,
+ "Caught the following exception attempting to synchronize events from member={0}; regionPath={1}; entriesToSynchronize={2}:");
+ public static final StringId GatewaySenderQueueEntrySynchronizationReplyProcessor_REPLY_IS_EMPTY =
+ new StringId(6663,
+ "Synchronization event reply from member={0}; regionPath={1}; key={2}; entryVersion={3} is empty");
+ public static final StringId AbstractGatewaySender_PROVIDING_SYNCHRONIZATION_EVENT =
+ new StringId(6664, "{0}: Providing synchronization event for key={1}; timestamp={2}: {3}");
+ public static final StringId AbstractGatewaySender_ENQUEUEING_SYNCHRONIZATION_EVENT =
+ new StringId(6665, "{0}: Enqueueing synchronization event: {1}");
+
/** Testing strings, messageId 90000-99999 **/
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java
new file mode 100644
index 0000000..ae14c5f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueEntrySynchronizationDUnitTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static junitparams.JUnitParamsRunner.$;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
+import org.apache.geode.internal.cache.versions.VMVersionTag;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class AsyncEventQueueEntrySynchronizationDUnitTest extends AsyncEventQueueTestBase {
+
+ private static Object[] getRegionShortcuts() {
+ return $(new Object[] {RegionShortcut.PARTITION_REDUNDANT, false, true},
+ new Object[] {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, true, true});
+ }
+
+ @Test
+ @Parameters(method = "getRegionShortcuts")
+ public void testParallelAsyncEventQueueSynchronization(RegionShortcut regionShortcut,
+ boolean isPersistentAeq, boolean isParallelAeq) throws Exception {
+ // Start locator
+ Integer locatorPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+
+ // Start three members
+ vm1.invoke(() -> createCache(locatorPort));
+ vm2.invoke(() -> createCache(locatorPort));
+ vm3.invoke(() -> createCache(locatorPort));
+
+ // Create parallel AsyncEventQueue
+ String aeqId = "db";
+ vm1.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+ null, true, new WaitingAsyncEventListener()));
+ vm2.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+ null, true, new WaitingAsyncEventListener()));
+ vm3.invoke(() -> createAsyncEventQueue(aeqId, isParallelAeq, 100, 1, false, isPersistentAeq,
+ null, true, new WaitingAsyncEventListener()));
+
+ // Create PartitionedRegion with redundant-copies=2
+ String regionName = getTestMethodName() + "_PR";
+ vm1.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+ vm2.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+ vm3.invoke(() -> createPartitionedRegion(regionName, regionShortcut, aeqId, 2));
+
+ // Create primary bucket in member 1, secondary buckets in members 2 and 3
+ Object key = "0";
+ vm1.invoke(() -> createBucket(regionName, key));
+ vm2.invoke(() -> createBucket(regionName, key));
+ vm3.invoke(() -> createBucket(regionName, key));
+ vm1.invoke(() -> assertPrimaryBucket(regionName, key));
+
+ // Fake a replication event in member 2
+ InternalDistributedMember idmMember1 = vm1.invoke(() -> getMember());
+ VersionSource vsMember1 = vm1.invoke(() -> getVersionMember(regionName));
+ vm2.invoke(() -> doFakeUpdate(idmMember1, vsMember1, regionName, key));
+
+ // Verify only member 2's queue contains an event
+ vm1.invoke(() -> checkAsyncEventQueueSize(aeqId, 0, true));
+ vm2.invoke(() -> checkAsyncEventQueueSize(aeqId, 1, true));
+ vm3.invoke(() -> checkAsyncEventQueueSize(aeqId, 0, true));
+
+ // Crash member 1
+ DistributedTestUtils.crashDistributedSystem(vm1);
+
+ // Wait for member 3's queue to contain 1 event (which means it has been synchronized)
+ vm3.invoke(() -> waitForAsyncEventQueueSize(aeqId, 1, true));
+
+ // Start processing events (mainly to avoid an InterruptedException suspect string)
+ vm2.invoke(() -> startProcessingAsyncEvents(aeqId));
+ vm3.invoke(() -> startProcessingAsyncEvents(aeqId));
+ }
+
+ private void createPartitionedRegion(String regionName, RegionShortcut regionShortcut,
+ String aeqId, int redundantCopies) {
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundantCopies);
+
+ cache.createRegionFactory(regionShortcut).addAsyncEventQueueId(aeqId)
+ .setPartitionAttributes(paf.create()).create(regionName);
+ }
+
+ private void createBucket(String regionName, Object key) throws Exception {
+ PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+ int bucketId = PartitionedRegionHelper.getHashKey(pr, null, key, null, null);
+ pr.getRedundancyProvider().createBackupBucketOnMember(bucketId, getMember(), false, false, null,
+ true);
+ }
+
+ private void assertPrimaryBucket(String regionName, Object key) throws Exception {
+ PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+ int bucketId = PartitionedRegionHelper.getHashKey(pr, null, key, null, null);
+ assertThat(pr.getRegionAdvisor().isPrimaryForBucket(bucketId)).isTrue();
+ }
+
+ private InternalDistributedMember getMember() {
+ return (InternalDistributedMember) cache.getDistributedSystem().getDistributedMember();
+ }
+
+ private VersionSource getVersionMember(String regionName) {
+ PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+ return pr.getVersionMember();
+ }
+
+ private void doFakeUpdate(InternalDistributedMember fromMember, VersionSource versionSource,
+ String regionName, Object key) {
+ // Get the BucketRegion for the regionName and key
+ PartitionedRegion pr = (PartitionedRegion) cache.getRegion(regionName);
+ BucketRegion br = pr.getBucketRegion(key);
+
+ // Create VersionTag
+ long timestamp = System.currentTimeMillis();
+ VersionTag tag = new VMVersionTag();
+ tag.setMemberID(versionSource);
+ tag.setRegionVersion(1);
+ tag.setEntryVersion(1);
+ tag.setVersionTimeStamp(timestamp);
+ tag.setIsRemoteForTesting();
+
+ EntryEventImpl event =
+ EntryEventImpl.create(br, Operation.CREATE, key, true, fromMember, true, false);
+ event.setNewValue(new VMCachedDeserializable("0", 0));
+ event.setTailKey(161l);
+ event.setVersionTag(tag);
+ event.setEventId(new EventID(cache.getDistributedSystem()));
+
+ // Put event into region
+ br.getRegionMap().basicPut(event, timestamp, true, false, null, false, false);
+ }
+
+ private void startProcessingAsyncEvents(String aeqId) {
+ // Get the async event listener
+ WaitingAsyncEventListener listener = getWaitingAsyncEventListener(aeqId);
+
+ // Start processing waiting events
+ listener.startProcessingEvents();
+ }
+
+ private WaitingAsyncEventListener getWaitingAsyncEventListener(String aeqId) {
+ // Get the async event queue
+ AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId);
+ assertThat(aeq).isNotNull();
+
+ // Get and return the async event listener
+ AsyncEventListener aeqListener = aeq.getAsyncEventListener();
+ assertThat(aeqListener).isInstanceOf(WaitingAsyncEventListener.class);
+ return (WaitingAsyncEventListener) aeqListener;
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index dc7a218..6ba8c66 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.junit.experimental.categories.Category;
@@ -119,7 +120,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
protected static AsyncEventListener eventListener1;
- private static final long MAX_WAIT = 10000;
+ private static final long MAX_WAIT = 60000;
protected static GatewayEventFilter eventFilter;
@@ -477,21 +478,21 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
}
public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
+ checkAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
+ }
- GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();
+ public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries,
+ boolean localSize) {
+ AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(asyncQueueId);
+ GatewaySender sender = aeq.getSender();
if (sender.isParallel()) {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
- assertEquals(numQueueEntries,
- queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
+ Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
+ if (localSize) {
+ queueRegion = PartitionRegionHelper.getLocalData(queueRegion);
+ }
+ assertEquals(numQueueEntries, queueRegion.size());
} else {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
@@ -513,19 +514,21 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
*/
public static void waitForAsyncEventQueueSize(String asyncQueueId, final int numQueueEntries)
throws Exception {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
+ waitForAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
+ }
- GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();
+ public static void waitForAsyncEventQueueSize(String asyncQueueId, final int numQueueEntries,
+ boolean localSize) throws Exception {
+ AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(asyncQueueId);
+ GatewaySender sender = aeq.getSender();
if (sender.isParallel()) {
final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+ Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
+ if (localSize) {
+ queueRegion = PartitionRegionHelper.getLocalData(queueRegion);
+ }
+ final Region fQueueRegion = queueRegion;
Wait.waitForCriterion(new WaitCriterion() {
@@ -534,8 +537,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
}
public boolean done() {
- boolean done = numQueueEntries == queues.toArray(new RegionQueue[queues.size()])[0]
- .getRegion().size();
+ boolean done = numQueueEntries == fQueueRegion.size();
return done;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java
new file mode 100644
index 0000000..f616f67
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitingAsyncEventListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+public class WaitingAsyncEventListener implements AsyncEventListener {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private final AtomicInteger numberOfEvents = new AtomicInteger();
+
+ public boolean processEvents(List<AsyncEvent> events) {
+ try {
+ waitToStartProcessingEvents();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("WaitingAsyncEventListener processEvents was interrupted");
+ }
+ for (AsyncEvent event : events) {
+ process(event);
+ }
+ return true;
+ }
+
+ private void process(AsyncEvent event) {
+ incrementTotalEvents();
+ }
+
+ private void waitToStartProcessingEvents() throws InterruptedException {
+ this.latch.await();
+ }
+
+ public void startProcessingEvents() {
+ this.latch.countDown();
+ }
+
+ private int incrementTotalEvents() {
+ return this.numberOfEvents.incrementAndGet();
+ }
+
+ public int getTotalEvents() {
+ return this.numberOfEvents.get();
+ }
+
+ public void close() {}
+}
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 1820393..c0cf5ee 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1992,6 +1992,14 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
fromData,183,2bb9007301003d1c10119f00032a04b5002b2a2bb900740100b500282a2bb900740100b500291c1011a200232bc1007599001c2bb80076b20077a60012bb0078592bc00075b20079b7007a4c2a2bb8007bc0007cb5002a2a2bb8007db500102a2bb9007e0100b5002e2a2bb6007f2a2bb80080b500302a2bb8007bc00020b500212a2bb900810100b500132a2bb900820100b500172a2bb900740100b500092a2bb900820100b80004b500052a2bb900820100b5001bb1
toData,133,2ab600272b1011b9006802002b2ab40028b9006902002b2ab40029b9006902002ab4002a2bb8006a2ab400102bb8006b2b2ab4002eb9006c02002a2bb6006d2ab6002f2bb8006e2ab400212bb8006a2b2ab40013b9006f02002b2ab40017b9007003002b2ab40009b9006902002b2ab40005b60071b9007003002b2ab60072b900700300b1
+org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
+fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1
+toData,17,2ab400022bb800052ab400012bb80005b1
+
+org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationMessage,2
+fromData,32,2a2bb7003d2a2bb9003e0100b500032a2bb8003fb500062a2bb80040b50008b1
+toData,35,2a2bb700392b2ab40003b9003a02002ab400062bb8003b2ab40008c0001e2bb8003cb1
+
org/apache/geode/internal/cache/wan/parallel/ParallelQueueBatchRemovalMessage,2
fromData,17,2a2bb7003b2a2bb8003cc0003db50004b1
toData,14,2a2bb700392ab400042bb8003ab1
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 58d017c..8f51d71 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3230,7 +3230,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
.getAllLocalPrimaryBucketRegions();
for (final BucketRegion bucket : buckets) {
- Awaitility.await().atMost(180, TimeUnit.SECONDS).until(() -> {
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
assertEquals("Expected bucket entries for bucket: " + bucket.getId()
+ " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
+ bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].