You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:46:14 UTC
[60/60] [abbrv] incubator-geode git commit: GEODE-1209: Added new
attribute to forward eviction/expiration to AEQ.
GEODE-1209: Added new attribute to forward eviction/expiration to AEQ.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/18d52357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/18d52357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/18d52357
Branch: refs/heads/feature/GEODE-1209
Commit: 18d52357d015db836c184d4d3df3c13925b80d02
Parents: 4a6c779
Author: Anil <ag...@pivotal.io>
Authored: Tue May 3 13:52:18 2016 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Tue May 3 14:42:43 2016 -0700
----------------------------------------------------------------------
.../cache/asyncqueue/AsyncEventQueue.java | 9 +
.../asyncqueue/AsyncEventQueueFactory.java | 8 +
.../internal/AsyncEventQueueFactoryImpl.java | 8 +-
.../internal/AsyncEventQueueImpl.java | 9 +-
.../gemfire/cache/wan/GatewaySender.java | 2 +
.../gemfire/internal/cache/LocalRegion.java | 54 +--
.../cache/wan/AbstractGatewaySender.java | 111 +++---
.../cache/wan/GatewaySenderAttributes.java | 7 +
.../cache/xmlcache/AsyncEventQueueCreation.java | 11 +
.../internal/cache/xmlcache/CacheXml.java | 1 +
.../cache/xmlcache/CacheXmlGenerator.java | 7 +
.../internal/cache/xmlcache/CacheXmlParser.java | 7 +
.../geode.apache.org/schema/cache/cache-1.0.xsd | 1 +
...ventQueueEvictionAndExpirationJUnitTest.java | 362 +++++++++++++++++++
14 files changed, 514 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
index a2b8b0f..c2d04a1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
@@ -147,4 +147,13 @@ public interface AsyncEventQueue {
* <code>AsyncEventQueue</code>
*/
public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter();
+
+ /**
+ * Represents if eviction and expiration events/operations are ignored (not passed)
+ * with <code>AsyncEventListener</code>.
+ *
+ * @return boolen True if eviction and expiration operations are ignored.
+ */
+ public boolean isIgnoreEvictionAndExpiration();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
index 3e30b38..fccb81e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -170,7 +170,15 @@ public interface AsyncEventQueueFactory {
public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
GatewayEventSubstitutionFilter filter);
+ /**
+ * Ignores the eviction and expiration events.
+ *
+ * @param ignore
+ * boolean to indicate whether to ignore eviction and expiration events.
+ */
+ public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore);
+
/**
* Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue
* and instance of AsyncEventListener. Multiple queues can be created using
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index 312e880..1ec3ba0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -277,7 +277,7 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
this.attrs.eventFilters = asyncQueueCreation.getGatewayEventFilters();
this.attrs.eventSubstitutionFilter = asyncQueueCreation.getGatewayEventSubstitutionFilter();
this.attrs.isForInternalUse = true;
-
+ this.attrs.ignoreEvictionAndExpiration = asyncQueueCreation.isIgnoreEvictionAndExpiration();
}
public AsyncEventQueueFactory setParallel(boolean isParallel) {
@@ -292,4 +292,10 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
this.attrs.isMetaQueue = isMetaQueue;
return this;
}
+
+ @Override
+ public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore) {
+ this.attrs.ignoreEvictionAndExpiration = ignore;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 6b3eb4a..5a0b370 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -36,7 +36,7 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
private GatewaySender sender = null;
private AsyncEventListener asyncEventListener = null;
-
+
public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";
public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
@@ -200,6 +200,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
public boolean isBucketSorted() {
// TODO Auto-generated method stub
return false;
- }
-
+ }
+
+ public boolean isIgnoreEvictionAndExpiration() {
+ return ((AbstractGatewaySender)this.sender).isIgnoreEvictionAndExpiration();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
index c5b5d3a..b0ad410 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
@@ -96,6 +96,8 @@ public interface GatewaySender {
public static final int DEFAULT_DISPATCHER_THREADS = 5;
+ public static final boolean DEFAULT_IGNORE_EVICTION_EXPIRATION = true;
+
public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY;
/**
* The default maximum amount of memory (MB) to allow in the queue before
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 3ad294c..ac3a728 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -1148,7 +1148,8 @@ public class LocalRegion extends AbstractRegion
public boolean generateEventID()
{
return !(isUsedForPartitionedRegionAdmin()
- || isUsedForPartitionedRegionBucket() );
+ || (isUsedForPartitionedRegionBucket() && !(((BucketRegion)this)
+ .getPartitionedRegion().getAsyncEventQueueIds().size() > 0)));
}
public final Object destroy(Object key, Object aCallbackArgument)
@@ -6641,10 +6642,14 @@ public class LocalRegion extends AbstractRegion
protected void notifyGatewaySender(EnumListenerEvent operation,
EntryEventImpl event) {
- if (event.isConcurrencyConflict()) { // usually concurrent cache modification problem
+ if (this.isInternalRegion() || isPdxTypesRegion() ||
+ event.isConcurrencyConflict() /* usually concurrent cache modification problem */) {
return;
}
+
+ logger.info("### notifying GW senders :" + event);
+
// Return if the inhibit all notifications flag is set
if (event.inhibitAllNotifications()){
if(logger.isDebugEnabled()) {
@@ -6653,34 +6658,31 @@ public class LocalRegion extends AbstractRegion
return;
}
- if (!event.getOperation().isLocal()) {
- Set<String> allGatewaySenderIds = null;
- checkSameSenderIdsAvailableOnAllNodes();
- if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
- allGatewaySenderIds = getGatewaySenderIds();
- } else {
- allGatewaySenderIds = getAllGatewaySenderIds();
- }
-
- List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
+
+ Set<String> allGatewaySenderIds = null;
+ checkSameSenderIdsAvailableOnAllNodes();
+ if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
+ allGatewaySenderIds = getGatewaySenderIds();
+ } else {
+ allGatewaySenderIds = getAllGatewaySenderIds();
+ }
- if (allRemoteDSIds != null) {
- for (GatewaySender sender : getCache().getAllGatewaySenders()) {
- if (!isPdxTypesRegion()) {
- if (allGatewaySenderIds.contains(sender.getId())) {
- //TODO: This is a BUG. Why return and not continue?
- if((!this.getDataPolicy().withStorage()) && sender.isParallel()){
- return;
- }
- if(logger.isDebugEnabled()) {
- logger.debug("Notifying the GatewaySender : {}", sender.getId());
- }
- ((AbstractGatewaySender)sender).distribute(operation, event,
- allRemoteDSIds);
- }
+ List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
+ if (allRemoteDSIds != null) {
+ for (GatewaySender sender : getCache().getAllGatewaySenders()) {
+ if (allGatewaySenderIds.contains(sender.getId())) {
+ //TODO: This is a BUG. Why return and not continue?
+ if((!this.getDataPolicy().withStorage()) && sender.isParallel()){
+ return;
+ }
+ if(logger.isDebugEnabled()) {
+ logger.debug("Notifying the GatewaySender : {}", sender.getId());
}
+ ((AbstractGatewaySender)sender).distribute(operation, event,
+ allRemoteDSIds);
}
}
+
// if (shouldNotifyGatewaySender()) {
// // Get All WAN site DSID's to be sent to each WAN site so that they
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index fe09d03..30d1fd2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
@@ -137,6 +138,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
protected List<AsyncEventListener> listeners;
+ protected boolean ignoreEvictionAndExpiration;
+
protected GatewayEventSubstitutionFilter substitutionFilter;
protected LocatorDiscoveryCallback locatorDiscoveryCallback;
@@ -269,55 +272,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
initializeEventIdIndex();
}
this.isBucketSorted = attrs.isBucketSorted();
- }
-
- public void createSender(Cache cache, GatewaySenderAttributes attrs){
- this.cache = cache;
- this.id = attrs.getId();
- this.socketBufferSize = attrs.getSocketBufferSize();
- this.socketReadTimeout = attrs.getSocketReadTimeout();
- this.queueMemory = attrs.getMaximumQueueMemory();
- this.batchSize = attrs.getBatchSize();
- this.batchTimeInterval = attrs.getBatchTimeInterval();
- this.isConflation = attrs.isBatchConflationEnabled();
- this.isPersistence = attrs.isPersistenceEnabled();
- this.alertThreshold = attrs.getAlertThreshold();
- this.manualStart = attrs.isManualStart();
- this.isParallel = attrs.isParallel();
- this.isForInternalUse = attrs.isForInternalUse();
- this.diskStoreName = attrs.getDiskStoreName();
- this.remoteDSId = attrs.getRemoteDSId();
- this.eventFilters = attrs.getGatewayEventFilters();
- this.transFilters = attrs.getGatewayTransportFilters();
- this.listeners = attrs.getAsyncEventListeners();
- this.substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
- this.locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
- this.isDiskSynchronous = attrs.isDiskSynchronous();
- this.policy = attrs.getOrderPolicy();
- this.dispatcherThreads = attrs.getDispatcherThreads();
- this.parallelismForReplicatedRegion = attrs.getParallelismForReplicatedRegion();
- //divide the maximumQueueMemory of sender equally using number of dispatcher threads.
- //if dispatcherThreads is 1 then maxMemoryPerDispatcherQueue will be same as maximumQueueMemory of sender
- this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
- this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
- this.serialNumber = DistributionAdvisor.createSerialNumber();
- if (!(this.cache instanceof CacheCreation)) {
- this.stopper = new Stopper(cache.getCancelCriterion());
- this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
- if (!this.isForInternalUse()) {
- this.statistics = new AsyncEventQueueStats(cache.getDistributedSystem(),
- id);
- }
- else {// this sender lies underneath the AsyncEventQueue. Need to have
- // AsyncEventQueueStats
- this.statistics = new AsyncEventQueueStats(
- cache.getDistributedSystem(), AsyncEventQueueImpl
- .getAsyncEventQueueIdFromSenderId(id));
- }
- initializeEventIdIndex();
- }
- this.isBucketSorted = attrs.isBucketSorted();
-
+ this.ignoreEvictionAndExpiration = attrs.isIgnoreEvictionAndExpiration();
}
public GatewaySenderAdvisor getSenderAdvisor() {
@@ -392,6 +347,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return !this.listeners.isEmpty();
}
+ public boolean isIgnoreEvictionAndExpiration() {
+ return this.ignoreEvictionAndExpiration;
+ }
+
public boolean isManualStart() {
return this.manualStart;
}
@@ -839,20 +798,64 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return this.eventProcessor;
}
+ /**
+ * Check if this event can be distributed by senders.
+ * @param event
+ * @param stats
+ * @return boolean True if the event is allowed.
+ */
+ private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats stats) {
+ logger.info("### isIgnoreEvictionAndExpiration :" + isIgnoreEvictionAndExpiration());
+ if (event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL))
+ {
+ return false;
+ }
+
+ // Eviction and expirations are not passed to WAN.
+ // Eviction and Expiration are passed to AEQ based on its configuration.
+ if (event.getOperation().isLocal() || event.getOperation().isExpiration()) {
+ // Check if its AEQ and AEQ is configured to forward eviction/expiration events.
+ if (this.isAsyncEventQueue() && !this.isIgnoreEvictionAndExpiration()) {
+ return true;
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
List<Integer> allRemoteDSIds) {
+
final boolean isDebugEnabled = logger.isDebugEnabled();
+ // If this gateway is not running, return
+ if (!isRunning()) {
+ if (isDebugEnabled) {
+ logger.debug("Returning back without putting into the gateway sender queue");
+ }
+ return;
+ }
+
final GatewaySenderStats stats = getStatistics();
stats.incEventsReceived();
- // If the event is local (see bug 35831) or an expiration ignore it.
- //removed the check of isLocal as in notifyGAtewayHub this has been taken care
- if (/*event.getOperation().isLocal() || */event.getOperation().isExpiration()
- || event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) {
+
+ if (!checkForDistribution(event, stats)) {
getStatistics().incEventsNotQueued();
return;
}
+<<<<<<< HEAD
+=======
+ if (getIsHDFSQueue() && event.getOperation().isEviction()) {
+ if (logger.isDebugEnabled())
+ logger.debug("Eviction event not queued: " + event);
+ stats.incEventsNotQueued();
+ return;
+ }
+
+>>>>>>> GEODE-1209: Added new attribute to forward eviction/expiration to AEQ.
// this filter is defined by Asif which exist in old wan too. new wan has
// other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
// not cinsidering this filter
@@ -941,6 +944,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
}
try {
// If this gateway is not running, return
+ // The sender may have stopped, after we have checked the status in the beginning.
if (!isRunning()) {
if (isDebugEnabled) {
logger.debug("Returning back without putting into the gateway sender queue");
@@ -988,6 +992,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
}
}
+
/**
* During sender is getting started, if there are any cache operation on queue then that event will be stored in temp queue.
* Once sender is started, these event from tmp queue will be added to sender queue.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1cef940..163943f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -83,6 +83,8 @@ public class GatewaySenderAttributes {
public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
+ public boolean ignoreEvictionAndExpiration = GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION;
+
public int getSocketBufferSize() {
return this.socketBufferSize;
}
@@ -192,4 +194,9 @@ public class GatewaySenderAttributes {
public boolean isMetaQueue() {
return this.isMetaQueue;
}
+
+ public boolean isIgnoreEvictionAndExpiration() {
+ return this.ignoreEvictionAndExpiration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 0015665..4c2943e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -43,6 +43,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
private boolean isBucketSorted = false;
private int dispatcherThreads = 1;
private OrderPolicy orderPolicy = OrderPolicy.KEY;
+ private boolean ignoreEvictionAndExpiration = true;
public AsyncEventQueueCreation() {
}
@@ -62,6 +63,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
this.asyncEventListener = eventListener;
this.isBucketSorted = senderAttrs.isBucketSorted;
this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
+ this.ignoreEvictionAndExpiration = senderAttrs.ignoreEvictionAndExpiration;
}
@Override
@@ -211,4 +213,13 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
public void setBucketSorted(boolean isBucketSorted) {
this.isBucketSorted = isBucketSorted;
}
+
+ public void setIgnoreEvictionAndExpiration(boolean ignore) {
+ this.ignoreEvictionAndExpiration = ignore;
+ }
+
+ @Override
+ public boolean isIgnoreEvictionAndExpiration() {
+ return this.ignoreEvictionAndExpiration;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index aa7d49a..c3eccd2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -762,6 +762,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
protected static final String ASYNC_EVENT_LISTENER = "async-event-listener";
public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
+ protected static final String IGNORE_EVICTION_AND_EXPIRATION = "ignore-eviction-expiration";
/** The name of the <code>compressor</code> attribute */
protected static final String COMPRESSOR = "compressor";
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index ea3c975..a4101ba 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1521,6 +1521,13 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
atts.addAttribute("", "", ORDER_POLICY, "", String.valueOf(asyncEventQueue
.getOrderPolicy()));
}
+ // eviction and expiration events
+ if (asyncEventQueue.isIgnoreEvictionAndExpiration()) {
+ if (generateDefaults() || asyncEventQueue.isIgnoreEvictionAndExpiration() != (GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION))
+ atts.addAttribute("", "", IGNORE_EVICTION_AND_EXPIRATION, "", String.valueOf(asyncEventQueue
+ .isIgnoreEvictionAndExpiration()));
+ }
+
// disk-synchronous
if (generateDefaults() || asyncEventQueue.isDiskSynchronous() != GatewaySender.DEFAULT_DISK_SYNCHRONOUS)
atts.addAttribute("", "", DISK_SYNCHRONOUS, "", String.valueOf(asyncEventQueue
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index f344938..aec2dc3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -2313,6 +2313,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
}
}
+ // forward eviction and expiration events.
+ String ignoreEvictionExpiration = atts.getValue(IGNORE_EVICTION_AND_EXPIRATION);
+ if (ignoreEvictionExpiration != null) {
+ asyncEventQueueCreation.setIgnoreEvictionAndExpiration(Boolean.parseBoolean(ignoreEvictionExpiration));
+ }
+
stack.push(asyncEventQueueCreation);
}
@@ -2346,6 +2352,7 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
factory.setMaximumQueueMemory(asyncEventChannelCreation.getMaximumQueueMemory());
factory.setDispatcherThreads(asyncEventChannelCreation.getDispatcherThreads());
factory.setOrderPolicy(asyncEventChannelCreation.getOrderPolicy());
+ factory.setIgnoreEvictionAndExpiration(asyncEventChannelCreation.isIgnoreEvictionAndExpiration());
List<GatewayEventFilter> gatewayEventFilters = asyncEventChannelCreation.getGatewayEventFilters();
for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) {
factory.addGatewayEventFilter(gatewayEventFilter);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
----------------------------------------------------------------------
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index cc6d189..688ff1f 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -254,6 +254,7 @@ declarative caching XML file elements unless indicated otherwise.
<xsd:attribute name="disk-synchronous" type="xsd:boolean" use="optional" />
<xsd:attribute name="dispatcher-threads" type="xsd:string" use="optional" />
<xsd:attribute name="order-policy" type="xsd:string" use="optional" />
+ <xsd:attribute default="true" name="ignore-eviction-expiration" type="xsd:boolean" use="optional" />
</xsd:complexType>
</xsd:element>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
new file mode 100644
index 0000000..533592c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.asyncqueue;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import com.jayway.awaitility.Awaitility;
+
+import static org.mockito.Mockito.*;
+
+
+@Category(UnitTest.class)
+public class AsyncEventQueueEvictionAndExpirationJUnitTest {
+
+ private AsyncEventQueue aeq;
+ private Cache cache;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Before
+ public void getCache() {
+ try {
+ cache = CacheFactory.getAnyInstance();
+ } catch (Exception e) {
+ //ignore
+ }
+ if (null == cache) {
+ cache = (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create();
+ }
+ }
+
+ @After
+ public void destroyCache() {
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache = null;
+ }
+ }
+
+
+ @Test
+ public void isIgnoreEvictionAndExpirationAttributeTrueByDefault() {
+ AsyncEventListener al = mock(AsyncEventListener.class);
+ aeq = cache.createAsyncEventQueueFactory().create("aeq", al);
+ // Test for default value of isIgnoreEvictionAndExpiration setting.
+ assertTrue(aeq.isIgnoreEvictionAndExpiration());
+ }
+
+ @Test
+ public void canSetFalseForIgnoreEvictionAndExpiration() {
+ AsyncEventListener al = mock(AsyncEventListener.class);
+ aeq = cache.createAsyncEventQueueFactory().setIgnoreEvictionAndExpiration(false).create("aeq", al);
+ // Test for default value of isIgnoreEvictionAndExpiration setting.
+ assertFalse(aeq.isIgnoreEvictionAndExpiration());
+ }
+
+
+ @Test
+ public void evictionDestroyOpEventsNotPropogatedByDefault() {
+ // For Replicated Region with eviction-destroy op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void evictionDestroyOpEventsNotPropogatedByDefaultForPR() {
+ // For PR with eviction-destroy op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationDestroyOpEventsNotPropogatedByDefault() {
+ // For Replicated Region with expiration-destroy op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ true /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationDestroyOpEventsNotPropogatedByDefaultForPR() {
+ // For PR with expiration-destroy op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ true /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationInvalidOpEventsNotPropogatedByDefault() {
+ // For Replicated Region with expiration-invalid op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, true /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationInvalidOpEventsNotPropogatedByDefaultForPR() {
+ // For Replicated Region with expiration-invalid op.
+ // Number of expected events 2. Two for create and none for eviction destroy.
+ createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, true /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+ // For Replicated Region with eviction-destroy op.
+ // Number of expected events 3. Two for create and One for eviction destroy.
+ createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,
+ 3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+ // For PR with eviction-destroy op.
+ // Number of expected events 3. Two for create and One for eviction destroy.
+ createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,
+ 3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+ // For Replicated Region with eviction-overflow op.
+ // Number of expected events 2. Two for create and non for eviction overflow.
+ createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+ // For PR with eviction-overflow op.
+ // Number of expected events 2. Two for create and non for eviction overflow.
+ createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */,
+ false /* expirationDestroy */, false /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+ // For Replicated Region with expiration-destroy op.
+ // Number of expected events 4. Two for create and Two for expiration destroy.
+ createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,
+ 4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ true /* expirationDestroy */, false /* expirationInvalidate */,
+ true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+ // For PR with expiration-destroy op.
+ // Number of expected events 4. Two for create and Two for expiration destroy.
+ createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,
+ 4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ true /* expirationDestroy */, false /* expirationInvalidate */,
+ true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+ // For Replicated Region with expiration-invalidate op.
+ // Currently invalidate event callbacks are not made to GateWay sender.
+ // Invalidates are not sent to AEQ.
+ // Number of expected events 2. None for expiration invalidate.
+ createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, true /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+ @Test
+ public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+ // For PR with expiration-invalidate op.
+ // Currently invalidate event callbacks are not made to GateWay sender.
+ // Invalidates are not sent to AEQ.
+ // Number of expected events 2. None for expiration invalidate.
+ createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,
+ 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */,
+ false /* expirationDestroy */, true /* expirationInvalidate */,
+ false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+ }
+
+
+
+ private void createPopulateAndVerifyEvents(boolean isPR, boolean ignoreEvictionExpiration, int expectedEvents, boolean eviction, boolean evictionOverflow,
+ boolean expirationDestroy, boolean expirationInvalidate, boolean checkForDestroyOp, boolean checkForInvalidateOp) {
+
+ // String aeqId = "AEQTest";
+ String aeqId = name.getMethodName();
+
+ // To store AEQ events for validation.
+ List<AsyncEvent> events = new ArrayList<AsyncEvent>();
+
+ // Create AEQ
+ createAsyncEventQueue(aeqId, ignoreEvictionExpiration, events);
+
+ // Create region with eviction/expiration
+ Region r = createRegion("ReplicatedRegionForAEQ", isPR, aeqId, eviction, evictionOverflow, expirationDestroy, expirationInvalidate);
+
+ // Populate region with two entires.
+ r.put("Key-1", "Value-1");
+ r.put("Key-2", "Value-2");
+
+ // The AQListner should get two events. One for create, one for destroy.
+ Awaitility.await().atMost(100, TimeUnit.SECONDS).until(() -> {return (events.size() == expectedEvents);});
+
+ // Check for the expected operation.
+ if (checkForDestroyOp) {
+ assertTrue("Expiration event not arrived", checkForOperation(events, false, true));
+ }
+
+ if (checkForInvalidateOp) {
+ assertTrue("Invalidate event not arrived", checkForOperation(events, true, false));
+ }
+
+ // Test complete. Destroy region.
+ r.destroyRegion();
+ }
+
+ private boolean checkForOperation(List<AsyncEvent> events, boolean invalidate, boolean destroy) {
+ boolean found = false;
+ for (AsyncEvent e : events) {
+ if (invalidate && e.getOperation().isInvalidate()) {
+ found = true;
+ break;
+ }
+ if (destroy && e.getOperation().isDestroy()) {
+ found = true;
+ break;
+ }
+ }
+ return found;
+ }
+
+ private void createAsyncEventQueue(String id, boolean ignoreEvictionExpiration, List<AsyncEvent> storeEvents) {
+ AsyncEventListener al = this.createAsyncListener(storeEvents);
+ aeq = cache.createAsyncEventQueueFactory().setParallel(false)
+ .setIgnoreEvictionAndExpiration(ignoreEvictionExpiration)
+ .setBatchSize(1).setBatchTimeInterval(1).create(id, al);
+ }
+
+ private Region createRegion(String name, boolean isPR, String aeqId, boolean evictionDestroy,
+ boolean evictionOverflow, boolean expirationDestroy, boolean expirationInvalidate) {
+ RegionFactory rf;
+ if (isPR) {
+ rf = cache.createRegionFactory(RegionShortcut.PARTITION);
+ } else {
+ rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ }
+ // Set AsyncQueue.
+ rf.addAsyncEventQueueId(aeqId);
+ if (evictionDestroy) {
+ rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.LOCAL_DESTROY));
+ }
+ if (evictionOverflow) {
+ rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ if (expirationDestroy) {
+ rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY));
+ }
+ if (expirationInvalidate) {
+ rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.INVALIDATE));
+ }
+
+ return rf.create(name);
+ }
+
+ private AsyncEventListener createAsyncListener(List<AsyncEvent> list) {
+ AsyncEventListener listener = new AsyncEventListener() {
+ private List<AsyncEvent> aeList = list;
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> arg0) {
+ System.out.println("AEQ Listener.process()");
+ new Exception("Stack trace for AsyncEventQueue").printStackTrace();
+ // TODO Auto-generated method stub
+ aeList.addAll(arg0);
+ System.out.println("AEQ Event :" + arg0);
+ return true;
+ }
+ };
+ return listener;
+ }
+
+
+}