You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2016/02/18 20:13:53 UTC
[2/2] incubator-geode git commit: GEODE-967: Added xml support for
GatewayEventSubstitutionFilter
GEODE-967: Added xml support for GatewayEventSubstitutionFilter
- added configuration to AsyncEventQueueCreation
- added support to generate xml for GatewayEventSubstitutionFilter for
both GatewaySender and AsyncEventQueue
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/609e2395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/609e2395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/609e2395
Branch: refs/heads/develop
Commit: 609e2395d6cdd3e22abffdfe04aab511c4169c91
Parents: e685fd8
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Feb 17 10:10:59 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Feb 18 11:13:42 2016 -0800
----------------------------------------------------------------------
.../cache/xmlcache/AsyncEventQueueCreation.java | 1 +
.../cache/xmlcache/CacheXmlGenerator.java | 37 +++-
.../cache/wan/AsyncEventQueueTestBase.java | 217 ++++++++++---------
.../asyncqueue/AsyncEventListenerDUnitTest.java | 38 +++-
.../cache/CacheXml80GatewayDUnitTest.java | 72 +++++-
5 files changed, 240 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 60afc14..77f9596 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -63,6 +63,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
this.asyncEventListener = eventListener;
this.isBucketSorted = senderAttrs.isBucketSorted;
this.isHDFSQueue = senderAttrs.isHDFSQueue;
+ this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index 47c341c..4ba1409 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -44,6 +44,7 @@ import javax.xml.transform.TransformerFactory;
import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamResult;
+import com.gemstone.gemfire.cache.wan.*;
import org.xml.sax.Attributes;
import org.xml.sax.ContentHandler;
import org.xml.sax.DTDHandler;
@@ -104,10 +105,6 @@ import com.gemstone.gemfire.cache.query.internal.index.PrimaryKeyIndex;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.server.ServerLoadProbe;
import com.gemstone.gemfire.cache.util.ObjectSizer;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
import com.gemstone.gemfire.distributed.Role;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.AbstractRegion;
@@ -1468,6 +1465,12 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
generateGatewayEventFilter(gef);
}
+ if (this.version.compareTo(CacheXmlVersion.VERSION_8_0) >= 0) {
+ if (sender.getGatewayEventSubstitutionFilter() != null) {
+ generateGatewayEventSubstitutionFilter(sender.getGatewayEventSubstitutionFilter());
+ }
+ }
+
for (GatewayTransportFilter gsf : sender.getGatewayTransportFilters()) {
generateGatewayTransportFilter(gsf);
}
@@ -1532,7 +1535,13 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
generateGatewayEventFilter(eventFilter);
}
}
-
+
+ if (this.version.compareTo(CacheXmlVersion.VERSION_8_0) >= 0) {
+ if (asyncEventQueue.getGatewayEventSubstitutionFilter() != null) {
+ generateGatewayEventSubstitutionFilter(asyncEventQueue.getGatewayEventSubstitutionFilter());
+ }
+ }
+
AsyncEventListener asyncListener = asyncEventQueue.getAsyncEventListener();
if (asyncListener != null) {
generate(ASYNC_EVENT_LISTENER, asyncListener);
@@ -1628,6 +1637,24 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
}
handler.endElement("", GATEWAY_TRANSPORT_FILTER, GATEWAY_TRANSPORT_FILTER);
}
+
+ private void generateGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter)
+ throws SAXException {
+
+ handler.startElement("", GATEWAY_EVENT_SUBSTITUTION_FILTER, GATEWAY_EVENT_SUBSTITUTION_FILTER,
+ EMPTY);
+ String className = filter.getClass().getName();
+
+ handler.startElement("", CLASS_NAME, CLASS_NAME, EMPTY);
+ handler.characters(className.toCharArray(), 0, className.length());
+ handler.endElement("", CLASS_NAME, CLASS_NAME);
+ Properties props = null;
+ if (filter instanceof Declarable2) {
+ props = ((Declarable2)filter).getConfig();
+ generate(props, null);
+ }
+ handler.endElement("", GATEWAY_EVENT_SUBSTITUTION_FILTER, GATEWAY_EVENT_SUBSTITUTION_FILTER);
+ }
//
// private void generateGatewayEventListener(GatewayEventListener gef)
// throws SAXException {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index ff918b8..e6efcb2 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -32,6 +32,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
@@ -61,12 +62,8 @@ import com.gemstone.gemfire.cache.control.RebalanceResults;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.*;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -190,13 +187,7 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
.getName());
try {
AttributesFactory fact = new AttributesFactory();
- if (asyncQueueIds != null) {
- StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
- while (tokenizer.hasMoreTokens()) {
- String asyncQueueId = tokenizer.nextToken();
- fact.addAsyncEventQueueId(asyncQueueId);
- }
- }
+ addAsyncEventQueueIds(fact, asyncQueueIds);
fact.setDataPolicy(DataPolicy.REPLICATE);
fact.setOffHeap(offHeap);
RegionFactory regionFactory = cache.createRegionFactory(fact.create());
@@ -212,6 +203,16 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
String regionName, String asyncQueueIds) {
AttributesFactory fact = new AttributesFactory();
+ addAsyncEventQueueIds(fact, asyncQueueIds);
+ fact.setDataPolicy(DataPolicy.REPLICATE);
+ // set the CacheLoader
+ fact.setCacheLoader(new MyCacheLoader());
+ RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+ Region r = regionFactory.create(regionName);
+ assertNotNull(r);
+ }
+
+ private static void addAsyncEventQueueIds(AttributesFactory fact, String asyncQueueIds) {
if (asyncQueueIds != null) {
StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
while (tokenizer.hasMoreTokens()) {
@@ -219,12 +220,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
fact.addAsyncEventQueueId(asyncQueueId);
}
}
- fact.setDataPolicy(DataPolicy.REPLICATE);
- // set the CacheLoader
- fact.setCacheLoader(new MyCacheLoader());
- RegionFactory regionFactory = cache.createRegionFactory(fact.create());
- Region r = regionFactory.create(regionName);
- assertNotNull(r);
}
public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
@@ -258,37 +253,21 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
boolean isParallel, Integer maxMemory, Integer batchSize,
boolean isConflation, boolean isPersistent, String diskStoreName,
boolean isDiskSynchronous) {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
+ createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
+ AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
factory.setDiskSynchronous(isDiskSynchronous);
factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ // Set GatewayEventSubstitutionFilter
AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
asyncEventListener);
}
- public static void createAsyncEventQueueWithListener2(String asyncChannelId,
- boolean isParallel, Integer maxMemory, Integer batchSize,
- boolean isPersistent, String diskStoreName) {
-
+ private static void createDiskStore(String asyncChannelId, String diskStoreName) {
if (diskStoreName != null) {
File directory = new File(asyncChannelId + "_disk_"
+ System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -298,15 +277,17 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
dsf.setDiskDirs(dirs1);
DiskStore ds = dsf.create(diskStoreName);
}
+ }
+
+ public static void createAsyncEventQueueWithListener2(String asyncChannelId,
+ boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isPersistent, String diskStoreName) {
+
+ createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
+ AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
@@ -317,46 +298,40 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
boolean isParallel, Integer maxMemory, Integer batchSize,
boolean isConflation, boolean isPersistent, String diskStoreName,
boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
+ createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent,
+ diskStoreName, isDiskSynchronous, asyncListenerClass, null);
+ }
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
+ public static void createAsyncEventQueue(String asyncChannelId,
+ boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isConflation, boolean isPersistent, String diskStoreName,
+ boolean isDiskSynchronous, String asyncListenerClass,
+ String substitutionFilterClass) throws Exception {
+
+ createDiskStore(asyncChannelId, diskStoreName);
+
+ AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
+ factory.setDiskSynchronous(isDiskSynchronous);
+ factory.setBatchConflationEnabled(isConflation);
+ if (substitutionFilterClass != null) {
+ factory.setGatewayEventSubstitutionListener((GatewayEventSubstitutionFilter)getClass(substitutionFilterClass).newInstance());
}
+ // set dispatcher threads
+ factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)getClass(asyncListenerClass).newInstance());
+ }
+ private static Class getClass(String simpleClassName) throws Exception {
String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
- String className = packagePrefix + asyncListenerClass;
- AsyncEventListener asyncEventListener = null;
+ String className = packagePrefix + simpleClassName;
+ Class clazz = null;
try {
- Class clazz = Class.forName(className);
- asyncEventListener = (AsyncEventListener)clazz.newInstance();
+ clazz = Class.forName(className);
}
- catch (ClassNotFoundException e) {
+ catch (Exception e) {
throw e;
}
- catch (InstantiationException e) {
- throw e;
- }
- catch (IllegalAccessException e) {
- throw e;
- }
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setDiskSynchronous(isDiskSynchronous);
- factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- // set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
- asyncEventListener);
+ return clazz;
}
public static void createAsyncEventQueueWithCustomListener(
@@ -377,24 +352,11 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
.getName());
try {
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
+ createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
+ AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
factory.setDispatcherThreads(nDispatchers);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
asyncEventListener);
@@ -404,32 +366,29 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
}
}
+ private static AsyncEventQueueFactory getInitialAsyncEventQueueFactory(boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isPersistent, String diskStoreName) {
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ factory.setBatchSize(batchSize);
+ factory.setPersistent(isPersistent);
+ factory.setDiskStoreName(diskStoreName);
+ factory.setMaximumQueueMemory(maxMemory);
+ factory.setParallel(isParallel);
+ return factory;
+ }
+
public static void createConcurrentAsyncEventQueue(String asyncChannelId,
boolean isParallel, Integer maxMemory, Integer batchSize,
boolean isConflation, boolean isPersistent, String diskStoreName,
boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) {
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
+ createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
+ AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
factory.setDiskSynchronous(isDiskSynchronous);
factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- factory.setDispatcherThreads(dispatcherThreads);
factory.setOrderPolicy(policy);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
asyncEventListener);
@@ -1416,6 +1375,27 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
}
}
+ public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int numInvocations) {
+ AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
+ assertNotNull(queue);
+
+ // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
+ MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
+ assertNotNull(filter);
+ assertEquals(numInvocations, filter.getNumInvocations());
+
+ // Verify the AsyncEventListener has received the substituted values
+ MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener();
+ final Map eventsMap = listener.getEventsMap();
+ assertNotNull(eventsMap);
+ assertEquals(numInvocations, eventsMap.size());
+
+ for (Iterator i = eventsMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>) i.next();
+ assertEquals(MyGatewayEventSubstitutionFilter.SUBSTITUTION_PREFIX + entry.getKey(), entry.getValue());
+ }
+ }
+
public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
AsyncEventListener theListener = null;
@@ -1630,7 +1610,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
public boolean isOffHeap() {
return false;
}
-
}
class MyAsyncEventListener_CacheLoader implements AsyncEventListener {
@@ -1669,3 +1648,25 @@ class MyCacheLoader implements CacheLoader, Declarable {
}
}
+
+class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
+
+ private AtomicInteger numInvocations = new AtomicInteger();
+
+ protected static final String SUBSTITUTION_PREFIX = "substituted_";
+
+ public Object getSubstituteValue(EntryEvent event) {
+ this.numInvocations.incrementAndGet();
+ return SUBSTITUTION_PREFIX + event.getKey();
+ }
+
+ public void close() {
+ }
+
+ public void init(Properties properties) {
+ }
+
+ protected int getNumInvocations() {
+ return this.numInvocations.get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 02ed4ef..978f4af 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -16,11 +16,15 @@
*/
package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
import org.junit.Ignore;
import com.gemstone.gemfire.cache.CacheFactory;
@@ -1068,7 +1072,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
}
}
}
-
+
public void testParallelAsyncEventQueue() {
Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
"createFirstLocatorWithDSId", new Object[] { 1 });
@@ -1098,7 +1102,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { getTestMethodName() + "_PR",
256 });
-
+
vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
new Object[] { "ln" });
vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
@@ -1107,7 +1111,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
new Object[] { "ln" });
vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
new Object[] { "ln" });
-
+
int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
new Object[] { "ln"});
int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
@@ -1116,10 +1120,33 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
new Object[] { "ln"});
int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
new Object[] { "ln"});
-
+
assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
}
-
+
+ public void testParallelAsyncEventQueueWithSubstitutionFilter() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener", "MyGatewayEventSubstitutionFilter" });
+
+ String regionName = getTestMethodName() + "_PR";
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { regionName, "ln", isOffHeap() });
+
+ int numPuts = 10;
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { regionName, numPuts });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "verifySubstitutionFilterInvocations",
+ new Object[] { "ln" , numPuts });
+ }
+
/**
* Verify that the events reaching the AsyncEventListener have correct operation detail.
* (added for defect #50237).
@@ -1918,5 +1945,4 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
"getAsyncEventListenerMapSize", new Object[] { "ln" });
assertEquals(vm3size, 1000);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
index 9495171..d04e916 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
@@ -17,14 +17,13 @@
package com.gemstone.gemfire.cache;
import java.io.IOException;
+import java.util.Properties;
import java.util.Set;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.cache30.CacheXmlTestCase;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.*;
+import com.gemstone.gemfire.cache30.*;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
@@ -69,9 +68,70 @@ public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase {
}
}
+ public void testAsyncEventQueueWithSubstitutionFilter() {
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ // Create an AsyncEventQueue with GatewayEventSubstitutionFilter.
+ String id = getName();
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ factory.setGatewayEventSubstitutionListener(new MyGatewayEventSubstitutionFilter());
+ AsyncEventQueue queue = factory.create(id, new CacheXml70DUnitTest.MyAsyncEventListener());
+
+ // Verify the GatewayEventSubstitutionFilter is set on the AsyncEventQueue.
+ assertNotNull(queue.getGatewayEventSubstitutionFilter());
+
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+
+ // Get the AsyncEventQueue. Verify the GatewayEventSubstitutionFilter is not null.
+ AsyncEventQueue queueOnCache = c.getAsyncEventQueue(id);
+ assertNotNull(queueOnCache);
+ assertNotNull(queueOnCache.getGatewayEventSubstitutionFilter());
+ }
+
+ public void testGatewaySenderWithSubstitutionFilter() {
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ // Create a GatewaySender with GatewayEventSubstitutionFilter.
+ // Don't start the sender to avoid 'Locators must be configured before starting gateway-sender' exception.
+ String id = getName();
+ GatewaySenderFactory factory = cache.createGatewaySenderFactory();
+ factory.setManualStart(true);
+ factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter());
+ GatewaySender sender = factory.create(id, 2);
+
+ // Verify the GatewayEventSubstitutionFilter is set on the GatewaySender.
+ assertNotNull(sender.getGatewayEventSubstitutionFilter());
+
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+
+ // Get the GatewaySender. Verify the GatewayEventSubstitutionFilter is not null.
+ GatewaySender senderOnCache = c.getGatewaySender(id);
+ assertNotNull(senderOnCache);
+ assertNotNull(senderOnCache.getGatewayEventSubstitutionFilter());
+ }
+
protected void validateGatewayReceiver(GatewayReceiver receiver1,
GatewayReceiver gatewayReceiver){
CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver);
assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart());
}
+
+ public static class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
+
+ public Object getSubstituteValue(EntryEvent event) {
+ return event.getKey();
+ }
+
+ public void close() {
+ }
+
+ public void init(Properties properties) {
+ }
+ }
}