You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:46:14 UTC

[60/60] [abbrv] incubator-geode git commit: GEODE-1209: Added new attribute to forward eviction/expiration to AEQ.

GEODE-1209: Added new attribute to forward eviction/expiration to AEQ.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/18d52357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/18d52357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/18d52357

Branch: refs/heads/feature/GEODE-1209
Commit: 18d52357d015db836c184d4d3df3c13925b80d02
Parents: 4a6c779
Author: Anil <ag...@pivotal.io>
Authored: Tue May 3 13:52:18 2016 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Tue May 3 14:42:43 2016 -0700

----------------------------------------------------------------------
 .../cache/asyncqueue/AsyncEventQueue.java       |   9 +
 .../asyncqueue/AsyncEventQueueFactory.java      |   8 +
 .../internal/AsyncEventQueueFactoryImpl.java    |   8 +-
 .../internal/AsyncEventQueueImpl.java           |   9 +-
 .../gemfire/cache/wan/GatewaySender.java        |   2 +
 .../gemfire/internal/cache/LocalRegion.java     |  54 +--
 .../cache/wan/AbstractGatewaySender.java        | 111 +++---
 .../cache/wan/GatewaySenderAttributes.java      |   7 +
 .../cache/xmlcache/AsyncEventQueueCreation.java |  11 +
 .../internal/cache/xmlcache/CacheXml.java       |   1 +
 .../cache/xmlcache/CacheXmlGenerator.java       |   7 +
 .../internal/cache/xmlcache/CacheXmlParser.java |   7 +
 .../geode.apache.org/schema/cache/cache-1.0.xsd |   1 +
 ...ventQueueEvictionAndExpirationJUnitTest.java | 362 +++++++++++++++++++
 14 files changed, 514 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
index a2b8b0f..c2d04a1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java
@@ -147,4 +147,13 @@ public interface AsyncEventQueue {
    *         <code>AsyncEventQueue</code>
    */
   public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter();
+ 
+  /**
+   * Represents if eviction and expiration events/operations are ignored (not passed)
+   * with <code>AsyncEventListener</code>.
+   * 
+   * @return boolen True if eviction and expiration operations are ignored.
+   */
+  public boolean isIgnoreEvictionAndExpiration();
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
index 3e30b38..fccb81e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -170,7 +170,15 @@ public interface AsyncEventQueueFactory {
   public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
       GatewayEventSubstitutionFilter filter);
 
+  /**
+   * Ignores the eviction and expiration events.
+   *
+   * @param ignore 
+   *        boolean to indicate whether to ignore eviction and expiration events. 
+   */
+  public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore);
 
+  
   /**
    * Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue
    * and instance of AsyncEventListener. Multiple queues can be created using

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index 312e880..1ec3ba0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -277,7 +277,7 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
     this.attrs.eventFilters = asyncQueueCreation.getGatewayEventFilters();
     this.attrs.eventSubstitutionFilter = asyncQueueCreation.getGatewayEventSubstitutionFilter();
     this.attrs.isForInternalUse = true;
-
+    this.attrs.ignoreEvictionAndExpiration = asyncQueueCreation.isIgnoreEvictionAndExpiration();
   }
 
   public AsyncEventQueueFactory setParallel(boolean isParallel) {
@@ -292,4 +292,10 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
     this.attrs.isMetaQueue = isMetaQueue;
     return this;
   }
+
+  @Override
+  public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore) {
+    this.attrs.ignoreEvictionAndExpiration = ignore;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 6b3eb4a..5a0b370 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -36,7 +36,7 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
   private GatewaySender sender = null;
   
   private AsyncEventListener asyncEventListener = null;
-  
+    
   public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";
   
   public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
@@ -200,6 +200,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
    public boolean isBucketSorted() {
     // TODO Auto-generated method stub
     return false;
-  }
-  
+  }     
+   
+   public boolean isIgnoreEvictionAndExpiration() {
+     return ((AbstractGatewaySender)this.sender).isIgnoreEvictionAndExpiration();
+   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
index c5b5d3a..b0ad410 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
@@ -96,6 +96,8 @@ public interface GatewaySender {
 
   public static final int DEFAULT_DISPATCHER_THREADS = 5;
   
+  public static final boolean DEFAULT_IGNORE_EVICTION_EXPIRATION = true;
+  
   public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY;
   /**
    * The default maximum amount of memory (MB) to allow in the queue before

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 3ad294c..ac3a728 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -1148,7 +1148,8 @@ public class LocalRegion extends AbstractRegion
   public boolean generateEventID()
   {     
     return !(isUsedForPartitionedRegionAdmin()
-        || isUsedForPartitionedRegionBucket() );
+        || (isUsedForPartitionedRegionBucket() && !(((BucketRegion)this)
+            .getPartitionedRegion().getAsyncEventQueueIds().size() > 0)));
   }
 
   public final Object destroy(Object key, Object aCallbackArgument)
@@ -6641,10 +6642,14 @@ public class LocalRegion extends AbstractRegion
   protected void notifyGatewaySender(EnumListenerEvent operation,
       EntryEventImpl event) {
     
-    if (event.isConcurrencyConflict()) { // usually concurrent cache modification problem
+    if (this.isInternalRegion() || isPdxTypesRegion() || 
+        event.isConcurrencyConflict() /* usually concurrent cache modification problem */) { 
       return;
     }
+
+    logger.info("### notifying GW senders :" + event);
     
+
     // Return if the inhibit all notifications flag is set
     if (event.inhibitAllNotifications()){
       if(logger.isDebugEnabled()) {
@@ -6653,34 +6658,31 @@ public class LocalRegion extends AbstractRegion
       return;
     }
     
-    if (!event.getOperation().isLocal()) {
-      Set<String> allGatewaySenderIds = null;
-      checkSameSenderIdsAvailableOnAllNodes();
-      if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-        allGatewaySenderIds = getGatewaySenderIds();
-      } else {
-        allGatewaySenderIds = getAllGatewaySenderIds();
-      }
-      
-      List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
+    
+    Set<String> allGatewaySenderIds = null;
+    checkSameSenderIdsAvailableOnAllNodes();
+    if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
+      allGatewaySenderIds = getGatewaySenderIds();
+    } else {
+      allGatewaySenderIds = getAllGatewaySenderIds();
+    }
 
-      if (allRemoteDSIds != null) {
-        for (GatewaySender sender : getCache().getAllGatewaySenders()) {
-          if (!isPdxTypesRegion()) {
-            if (allGatewaySenderIds.contains(sender.getId())) {
-              //TODO: This is a BUG. Why return and not continue?
-              if((!this.getDataPolicy().withStorage()) && sender.isParallel()){
-                return;
-              }
-              if(logger.isDebugEnabled()) {
-                logger.debug("Notifying the GatewaySender : {}", sender.getId());
-              }
-              ((AbstractGatewaySender)sender).distribute(operation, event,
-                  allRemoteDSIds);
-            }
+    List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
+    if (allRemoteDSIds != null) {
+      for (GatewaySender sender : getCache().getAllGatewaySenders()) {
+        if (allGatewaySenderIds.contains(sender.getId())) {
+          //TODO: This is a BUG. Why return and not continue?
+          if((!this.getDataPolicy().withStorage()) && sender.isParallel()){
+            return;
+          }
+          if(logger.isDebugEnabled()) {
+            logger.debug("Notifying the GatewaySender : {}", sender.getId());
           }
+          ((AbstractGatewaySender)sender).distribute(operation, event,
+              allRemoteDSIds);
         }
       }
+
       
 //      if (shouldNotifyGatewaySender()) {
 //        // Get All WAN site DSID's to be sent to each WAN site so that they

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index fe09d03..30d1fd2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
@@ -137,6 +138,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
 
   protected List<AsyncEventListener> listeners;
   
+  protected boolean ignoreEvictionAndExpiration;
+  
   protected GatewayEventSubstitutionFilter substitutionFilter;
   
   protected LocatorDiscoveryCallback locatorDiscoveryCallback;
@@ -269,55 +272,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
       initializeEventIdIndex();
     }
     this.isBucketSorted = attrs.isBucketSorted();
-  }
-  
-  public void createSender(Cache cache, GatewaySenderAttributes attrs){
-    this.cache = cache;
-    this.id = attrs.getId();
-    this.socketBufferSize = attrs.getSocketBufferSize();
-    this.socketReadTimeout = attrs.getSocketReadTimeout();
-    this.queueMemory = attrs.getMaximumQueueMemory();
-    this.batchSize = attrs.getBatchSize();
-    this.batchTimeInterval = attrs.getBatchTimeInterval();
-    this.isConflation = attrs.isBatchConflationEnabled();
-    this.isPersistence = attrs.isPersistenceEnabled();
-    this.alertThreshold = attrs.getAlertThreshold();
-    this.manualStart = attrs.isManualStart();
-    this.isParallel = attrs.isParallel();
-    this.isForInternalUse = attrs.isForInternalUse();
-    this.diskStoreName = attrs.getDiskStoreName();
-    this.remoteDSId = attrs.getRemoteDSId();
-    this.eventFilters = attrs.getGatewayEventFilters();
-    this.transFilters = attrs.getGatewayTransportFilters();
-    this.listeners = attrs.getAsyncEventListeners();
-    this.substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
-    this.locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
-    this.isDiskSynchronous = attrs.isDiskSynchronous();
-    this.policy = attrs.getOrderPolicy();
-    this.dispatcherThreads = attrs.getDispatcherThreads();
-    this.parallelismForReplicatedRegion = attrs.getParallelismForReplicatedRegion();
-    //divide the maximumQueueMemory of sender equally using number of dispatcher threads.
-    //if dispatcherThreads is 1 then maxMemoryPerDispatcherQueue will be same as maximumQueueMemory of sender
-    this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
-    this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
-    this.serialNumber = DistributionAdvisor.createSerialNumber();
-    if (!(this.cache instanceof CacheCreation)) {
-      this.stopper = new Stopper(cache.getCancelCriterion());
-      this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
-      if (!this.isForInternalUse()) {
-        this.statistics = new AsyncEventQueueStats(cache.getDistributedSystem(),
-            id);
-      }
-      else {// this sender lies underneath the AsyncEventQueue. Need to have
-            // AsyncEventQueueStats
-        this.statistics = new AsyncEventQueueStats(
-            cache.getDistributedSystem(), AsyncEventQueueImpl
-                .getAsyncEventQueueIdFromSenderId(id));
-      }
-      initializeEventIdIndex();
-    }
-    this.isBucketSorted = attrs.isBucketSorted();
-
+    this.ignoreEvictionAndExpiration = attrs.isIgnoreEvictionAndExpiration();
   }
   
   public GatewaySenderAdvisor getSenderAdvisor() {
@@ -392,6 +347,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     return !this.listeners.isEmpty();
   }
   
+  public boolean isIgnoreEvictionAndExpiration() {
+    return this.ignoreEvictionAndExpiration;
+  }
+  
   public boolean isManualStart() {
     return this.manualStart;
   }
@@ -839,20 +798,64 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     return this.eventProcessor;
   }
 
+  /**
+   * Check if this event can be distributed by senders.
+   * @param event
+   * @param stats
+   * @return boolean True if the event is allowed.
+   */
+  private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats stats) {
+    logger.info("### isIgnoreEvictionAndExpiration :" + isIgnoreEvictionAndExpiration());
+    if (event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL))
+    {
+      return false;
+    }
+    
+    // Eviction and expirations are not passed to WAN.
+    // Eviction and Expiration are passed to AEQ based on its configuration.
+    if (event.getOperation().isLocal() || event.getOperation().isExpiration()) {
+      // Check if its AEQ and AEQ is configured to forward eviction/expiration events.
+      if (this.isAsyncEventQueue() && !this.isIgnoreEvictionAndExpiration()) {
+        return true;
+      }
+      return false;
+    }
+    
+    return true;
+  }
+  
+  
   public void distribute(EnumListenerEvent operation, EntryEventImpl event,
       List<Integer> allRemoteDSIds) {
+    
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
+    // If this gateway is not running, return
+    if (!isRunning()) {
+      if (isDebugEnabled) {
+        logger.debug("Returning back without putting into the gateway sender queue");
+      }
+      return;
+    }
+    
     final GatewaySenderStats stats = getStatistics();
     stats.incEventsReceived();
-    // If the event is local (see bug 35831) or an expiration ignore it.
-    //removed the check of isLocal as in notifyGAtewayHub this has been taken care
-    if (/*event.getOperation().isLocal() || */event.getOperation().isExpiration()
-        || event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) {
+   
+    if (!checkForDistribution(event, stats)) {
       getStatistics().incEventsNotQueued();
       return;
     }
     
+<<<<<<< HEAD
+=======
+    if (getIsHDFSQueue() && event.getOperation().isEviction()) {
+      if (logger.isDebugEnabled())
+        logger.debug("Eviction event not queued: " + event);
+      stats.incEventsNotQueued();
+      return;
+    }
+    
+>>>>>>> GEODE-1209: Added new attribute to forward eviction/expiration to AEQ.
     // this filter is defined by Asif which exist in old wan too. new wan has
     // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
     // not cinsidering this filter
@@ -941,6 +944,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     }
     try {
       // If this gateway is not running, return
+      // The sender may have stopped, after we have checked the status in the beginning. 
       if (!isRunning()) {
         if (isDebugEnabled) {
           logger.debug("Returning back without putting into the gateway sender queue");
@@ -988,6 +992,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     }
   }
   
+
   /**
    * During sender is getting started, if there are any cache operation on queue then that event will be stored in temp queue. 
    * Once sender is started, these event from tmp queue will be added to sender queue.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1cef940..163943f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -83,6 +83,8 @@ public class GatewaySenderAttributes {
   
   public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
   
+  public boolean ignoreEvictionAndExpiration = GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION;
+  
   public int getSocketBufferSize() {
     return this.socketBufferSize;
   }
@@ -192,4 +194,9 @@ public class GatewaySenderAttributes {
   public boolean isMetaQueue() {
     return this.isMetaQueue;
   }
+  
+  public boolean isIgnoreEvictionAndExpiration() {
+    return this.ignoreEvictionAndExpiration;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 0015665..4c2943e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -43,6 +43,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   private boolean isBucketSorted = false;
   private int dispatcherThreads = 1;
   private OrderPolicy orderPolicy = OrderPolicy.KEY;
+  private boolean ignoreEvictionAndExpiration = true;
   
   public AsyncEventQueueCreation() {
   }
@@ -62,6 +63,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
     this.asyncEventListener = eventListener;
     this.isBucketSorted = senderAttrs.isBucketSorted; 
     this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
+    this.ignoreEvictionAndExpiration = senderAttrs.ignoreEvictionAndExpiration;
   }
   
   @Override
@@ -211,4 +213,13 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   public void setBucketSorted(boolean isBucketSorted) {
     this.isBucketSorted = isBucketSorted;
   }
+
+  public void setIgnoreEvictionAndExpiration(boolean ignore) {
+    this.ignoreEvictionAndExpiration = ignore;
+  }
+  
+  @Override
+  public boolean isIgnoreEvictionAndExpiration() {
+    return this.ignoreEvictionAndExpiration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index aa7d49a..c3eccd2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -762,6 +762,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
   protected static final String ASYNC_EVENT_LISTENER = "async-event-listener";
   public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
   protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
+  protected static final String IGNORE_EVICTION_AND_EXPIRATION = "ignore-eviction-expiration";
   
   /** The name of the <code>compressor</code> attribute */
   protected static final String COMPRESSOR = "compressor";

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index ea3c975..a4101ba 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1521,6 +1521,13 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
         atts.addAttribute("", "", ORDER_POLICY, "", String.valueOf(asyncEventQueue
           .getOrderPolicy()));
       }
+      // eviction and expiration events
+      if (asyncEventQueue.isIgnoreEvictionAndExpiration()) {
+        if (generateDefaults() || asyncEventQueue.isIgnoreEvictionAndExpiration() != (GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION))
+        atts.addAttribute("", "", IGNORE_EVICTION_AND_EXPIRATION, "", String.valueOf(asyncEventQueue
+          .isIgnoreEvictionAndExpiration()));
+      }
+      
       // disk-synchronous
       if (generateDefaults() || asyncEventQueue.isDiskSynchronous() != GatewaySender.DEFAULT_DISK_SYNCHRONOUS)
       atts.addAttribute("", "", DISK_SYNCHRONOUS, "", String.valueOf(asyncEventQueue

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index f344938..aec2dc3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -2313,6 +2313,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       }
     }
     
+    // forward eviction and expiration events.
+    String ignoreEvictionExpiration = atts.getValue(IGNORE_EVICTION_AND_EXPIRATION);
+    if (ignoreEvictionExpiration != null) {
+      asyncEventQueueCreation.setIgnoreEvictionAndExpiration(Boolean.parseBoolean(ignoreEvictionExpiration));
+    }
+    
     stack.push(asyncEventQueueCreation);
   }
   
@@ -2346,6 +2352,7 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     factory.setMaximumQueueMemory(asyncEventChannelCreation.getMaximumQueueMemory());
     factory.setDispatcherThreads(asyncEventChannelCreation.getDispatcherThreads());
     factory.setOrderPolicy(asyncEventChannelCreation.getOrderPolicy());
+    factory.setIgnoreEvictionAndExpiration(asyncEventChannelCreation.isIgnoreEvictionAndExpiration());
     List<GatewayEventFilter> gatewayEventFilters = asyncEventChannelCreation.getGatewayEventFilters();
     for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) {
       factory.addGatewayEventFilter(gatewayEventFilter);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
----------------------------------------------------------------------
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index cc6d189..688ff1f 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -254,6 +254,7 @@ declarative caching XML file elements unless indicated otherwise.
             <xsd:attribute name="disk-synchronous" type="xsd:boolean" use="optional" />
             <xsd:attribute name="dispatcher-threads" type="xsd:string" use="optional" />
             <xsd:attribute name="order-policy" type="xsd:string" use="optional" />
+            <xsd:attribute default="true" name="ignore-eviction-expiration" type="xsd:boolean" use="optional" />
           </xsd:complexType>
         </xsd:element>
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/18d52357/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
new file mode 100644
index 0000000..533592c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.asyncqueue;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import com.jayway.awaitility.Awaitility;
+
+import static org.mockito.Mockito.*;
+
+
+@Category(UnitTest.class)
+public class AsyncEventQueueEvictionAndExpirationJUnitTest {
+  
+  private AsyncEventQueue aeq;
+  private Cache cache;
+  
+  @Rule 
+  public TestName name = new TestName();
+  
+  @Before
+  public void getCache() {
+    try {
+       cache = CacheFactory.getAnyInstance();
+    } catch (Exception e) {
+      //ignore
+    }
+    if (null == cache) {
+      cache = (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create();
+    }
+  }
+
+  @After
+  public void destroyCache() {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache = null;
+    }
+  }
+
+  
+  @Test
+  public void isIgnoreEvictionAndExpirationAttributeTrueByDefault() {
+    AsyncEventListener al = mock(AsyncEventListener.class);
+    aeq = cache.createAsyncEventQueueFactory().create("aeq", al);
+    // Test for default value of isIgnoreEvictionAndExpiration setting.
+    assertTrue(aeq.isIgnoreEvictionAndExpiration());
+  }
+  
+  @Test
+  public void canSetFalseForIgnoreEvictionAndExpiration() {
+    AsyncEventListener al = mock(AsyncEventListener.class);
+    aeq = cache.createAsyncEventQueueFactory().setIgnoreEvictionAndExpiration(false).create("aeq", al);
+    // Test for default value of isIgnoreEvictionAndExpiration setting.
+    assertFalse(aeq.isIgnoreEvictionAndExpiration());
+  }
+  
+  
+  @Test
+  public void evictionDestroyOpEventsNotPropogatedByDefault() {
+    // For Replicated Region with eviction-destroy op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void evictionDestroyOpEventsNotPropogatedByDefaultForPR() {
+    // For PR with eviction-destroy op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationDestroyOpEventsNotPropogatedByDefault() {
+    // For Replicated Region with expiration-destroy op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        true /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationDestroyOpEventsNotPropogatedByDefaultForPR() {
+    // For PR with expiration-destroy op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        true /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationInvalidOpEventsNotPropogatedByDefault() {
+    // For Replicated Region with expiration-invalid op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, true /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationInvalidOpEventsNotPropogatedByDefaultForPR() {
+    // For Replicated Region with expiration-invalid op.
+    // Number of expected events 2. Two for create and none for eviction destroy.
+    createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, 
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, true /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+  
+  @Test
+  public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+    // For Replicated Region with eviction-destroy op.
+    // Number of expected events 3. Two for create and One for eviction destroy.
+    createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */, 
+        3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+    // For PR with eviction-destroy op.
+    // Number of expected events 3. Two for create and One for eviction destroy.
+    createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */, 
+        3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        true /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+    // For Replicated Region with eviction-overflow op.
+    // Number of expected events 2. Two for create and non for eviction overflow.
+    createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,  
+        2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+    // For PR with eviction-overflow op.
+    // Number of expected events 2. Two for create and non for eviction overflow.
+    createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,  
+        2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */, 
+        false /* expirationDestroy */, false /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+    // For Replicated Region with expiration-destroy op.
+    // Number of expected events 4. Two for create and Two for expiration destroy.
+    createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,  
+        4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        true /* expirationDestroy */, false /* expirationInvalidate */, 
+        true /* checkForDestroyOp */, false /* checkForInvalidateOp */);    
+  }
+
+  @Test
+  public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+    // For PR with expiration-destroy op.
+    // Number of expected events 4. Two for create and Two for expiration destroy.
+    createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,  
+        4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        true /* expirationDestroy */, false /* expirationInvalidate */, 
+        true /* checkForDestroyOp */, false /* checkForInvalidateOp */);    
+  }
+
+  @Test
+  public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() {
+    // For Replicated Region with expiration-invalidate op.
+    // Currently invalidate event callbacks are not made to GateWay sender.
+    // Invalidates are not sent to AEQ.
+    // Number of expected events 2. None for expiration invalidate.
+    createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */,  
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, true /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  @Test
+  public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() {
+    // For PR with expiration-invalidate op.
+    // Currently invalidate event callbacks are not made to GateWay sender.
+    // Invalidates are not sent to AEQ.
+    // Number of expected events 2. None for expiration invalidate.
+    createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */,  
+        2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, 
+        false /* expirationDestroy */, true /* expirationInvalidate */, 
+        false /* checkForDestroyOp */, false /* checkForInvalidateOp */);
+  }
+
+  
+  
+  private void createPopulateAndVerifyEvents(boolean isPR, boolean ignoreEvictionExpiration, int expectedEvents, boolean eviction, boolean evictionOverflow, 
+      boolean expirationDestroy, boolean expirationInvalidate, boolean checkForDestroyOp, boolean checkForInvalidateOp) {
+    
+    // String aeqId = "AEQTest";
+    String aeqId = name.getMethodName();
+    
+    // To store AEQ events for validation.
+    List<AsyncEvent> events = new ArrayList<AsyncEvent>();
+    
+    // Create AEQ
+    createAsyncEventQueue(aeqId, ignoreEvictionExpiration, events);    
+    
+    // Create region with eviction/expiration
+    Region r = createRegion("ReplicatedRegionForAEQ", isPR, aeqId, eviction, evictionOverflow, expirationDestroy, expirationInvalidate);
+    
+    // Populate region with two entires.
+    r.put("Key-1", "Value-1");
+    r.put("Key-2", "Value-2");
+    
+    // The AQListner should get two events. One for create, one for destroy.
+    Awaitility.await().atMost(100, TimeUnit.SECONDS).until(() -> {return (events.size() == expectedEvents);});
+    
+    // Check for the expected operation.
+    if (checkForDestroyOp) {
+      assertTrue("Expiration event not arrived", checkForOperation(events, false, true));
+    }
+
+    if (checkForInvalidateOp) {
+      assertTrue("Invalidate event not arrived", checkForOperation(events, true, false));
+    }
+    
+    // Test complete. Destroy region.
+    r.destroyRegion();
+  }
+
+  private boolean checkForOperation(List<AsyncEvent> events, boolean invalidate, boolean destroy) {
+    boolean found = false;
+    for (AsyncEvent e : events) {
+      if (invalidate && e.getOperation().isInvalidate()) {
+        found = true;
+        break;
+      }
+      if (destroy && e.getOperation().isDestroy()) {
+        found = true;
+        break;
+      }
+    }
+    return found;
+  }
+
+  private void createAsyncEventQueue(String id, boolean ignoreEvictionExpiration, List<AsyncEvent> storeEvents) {
+    AsyncEventListener al = this.createAsyncListener(storeEvents);
+    aeq = cache.createAsyncEventQueueFactory().setParallel(false)
+        .setIgnoreEvictionAndExpiration(ignoreEvictionExpiration)
+        .setBatchSize(1).setBatchTimeInterval(1).create(id, al);
+  }
+  
+  private Region createRegion(String name, boolean isPR, String aeqId, boolean evictionDestroy, 
+      boolean evictionOverflow, boolean expirationDestroy, boolean expirationInvalidate) {
+    RegionFactory rf;
+    if (isPR) {
+      rf = cache.createRegionFactory(RegionShortcut.PARTITION);
+    } else {
+      rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    }
+    // Set AsyncQueue.
+    rf.addAsyncEventQueueId(aeqId);
+    if (evictionDestroy) {
+      rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.LOCAL_DESTROY));
+    }
+    if (evictionOverflow) {
+      rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK)); 
+    }
+    if (expirationDestroy) {
+      rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY));
+    }
+    if (expirationInvalidate) {
+      rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.INVALIDATE));
+    }
+    
+    return rf.create(name);
+  }
+  
+  private AsyncEventListener createAsyncListener(List<AsyncEvent> list) {
+    AsyncEventListener listener = new AsyncEventListener() {
+      private List<AsyncEvent> aeList = list;
+      
+      @Override
+      public void close() {
+        // TODO Auto-generated method stub
+      }
+
+      @Override
+      public boolean processEvents(List<AsyncEvent> arg0) {
+        System.out.println("AEQ Listener.process()");
+        new Exception("Stack trace for AsyncEventQueue").printStackTrace();
+        // TODO Auto-generated method stub
+        aeList.addAll(arg0);
+        System.out.println("AEQ Event :" + arg0);
+        return true;
+      }
+    };
+    return listener;
+  }
+
+
+}