You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:47 UTC
[geode] 03/11: GEODE-7129: Adding XML config for creating AEQ with
paused event processing.
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 50fe3223b49cf90875de919316c99cc9996de9ee
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Sep 3 14:05:46 2019 -0700
GEODE-7129: Adding XML config for creating AEQ with paused event processing.
* Cache XML fields added
* Cache config fields added.
---
.../AsyncEventQueueValidationsJUnitTest.java | 51 ++++++++++++++++++++++
...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++
...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++
.../internal/cache/xmlcache/CacheCreation.java | 3 ++
.../geode/internal/cache/xmlcache/CacheXml.java | 1 +
.../internal/cache/xmlcache/CacheXmlParser.java | 9 ++++
.../geode.apache.org/schema/cache/cache-1.0.xsd | 1 +
.../geode/cache/configuration/CacheConfig.java | 28 ++++++++++++
8 files changed, 165 insertions(+)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
index 6cd325a..44fb069 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
@@ -23,9 +23,11 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -40,6 +42,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.apache.geode.internal.cache.wan.MyGatewayEventFilter;
import org.apache.geode.test.junit.categories.AEQTest;
import org.apache.geode.util.test.TestUtil;
@@ -151,9 +154,57 @@ public class AsyncEventQueueValidationsJUnitTest {
.until(() -> filter.getAfterAcknowledgementInvocations() == numPuts);
}
+ @Test
+ @Parameters(method = "getCacheXmlFileBaseNamesForPauseTests")
+ public void whenAsyncQueuesAreStartedInPausedStateShouldNotDispatchEventsTillItIsUnpaused(
+ String cacheXmlFileBaseName) {
+ // Create cache with xml
+ String cacheXmlFileName =
+ createTempFileFromResource(getClass(),
+ getClass().getSimpleName() + "." + cacheXmlFileBaseName + ".cache.xml")
+ .getAbsolutePath();
+ cache = new CacheFactory().set(MCAST_PORT, "0").set(CACHE_XML_FILE, cacheXmlFileName).create();
+
+ // Get AsyncEventQueue and GatewayEventFilter
+ AsyncEventQueue aeq = cache.getAsyncEventQueue(cacheXmlFileBaseName);
+
+ // Get region and do puts
+ Region region = cache.getRegion(cacheXmlFileBaseName);
+ int numPuts = 1000;
+ for (int i = 0; i < numPuts; i++) {
+ region.put(i, i);
+ }
+
+ MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener();
+
+ // Ensure that no listeners are being invoked
+ try {
+ await().atMost(10, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() > 0);
+ } catch (ConditionTimeoutException ex) {
+ // Expected Exception
+ }
+
+ // Ensure that the queue is filling up
+ await().atMost(60, TimeUnit.SECONDS).until(() -> ((AsyncEventQueueImpl) aeq).getSender()
+ .getQueues().stream().mapToInt(i -> i.size()).sum() == 1000);
+
+ // Unpause the sender
+ aeq.resumeEventDispatching();
+
+ // Ensure that listener is being invoke after unpause
+ await().atMost(60, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() == 1000);
+
+
+ }
+
private Object[] getCacheXmlFileBaseNames() {
return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlUsesFilter"},
new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlUsesFilter"});
}
+ private Object[] getCacheXmlFileBaseNamesForPauseTests() {
+ return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"},
+ new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"});
+ }
+
}
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
new file mode 100644
index 0000000..3bc15ac
--- /dev/null
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd">
+
+ <async-event-queue id="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="true" pause-event-processing="true">
+ <gateway-event-filter>
+ <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name>
+ </gateway-event-filter>
+ <async-event-listener>
+ <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name>
+ </async-event-listener>
+ </async-event-queue>
+
+ <region name="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION">
+ <region-attributes async-event-queue-ids="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"/>
+ </region>
+
+</cache>
+
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
new file mode 100644
index 0000000..8cf4d6a
--- /dev/null
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd">
+
+ <async-event-queue id="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="false" pause-event-processing="true">
+ <gateway-event-filter>
+ <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name>
+ </gateway-event-filter>
+ <async-event-listener>
+ <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name>
+ </async-event-listener>
+ </async-event-queue>
+
+ <region name="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION">
+ <region-attributes async-event-queue-ids="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"/>
+ </region>
+
+</cache>
+
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 2548f23..2803a97 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -553,6 +553,9 @@ public class CacheCreation implements InternalCache {
AsyncEventQueueFactoryImpl asyncQueueFactory =
(AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
asyncQueueFactory.configureAsyncEventQueue(asyncEventQueueCreation);
+ if (asyncEventQueueCreation.isDispatchingPaused()) {
+ asyncQueueFactory.pauseEventDispatchingToListener();
+ }
AsyncEventQueue asyncEventQueue = cache.getAsyncEventQueue(asyncEventQueueCreation.getId());
if (asyncEventQueue == null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
index f9be910..8cf1ceb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
@@ -750,6 +750,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
protected static final String ASYNC_EVENT_LISTENER = "async-event-listener";
public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
+ public static final String PAUSE_EVENT_PROCESSING = "pause-event-processing";
protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
protected static final String FORWARD_EXPIRATION_DESTROY = "forward-expiration-destroy";
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
index 2aeaa6f..03b1987 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
@@ -2246,6 +2246,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
asyncEventQueueCreation.setBatchSize(Integer.parseInt(batchSize));
}
+ // start in Paused state
+ String paused = atts.getValue(PAUSE_EVENT_PROCESSING);
+ if (paused != null) {
+ asyncEventQueueCreation.setPauseEventDispatching(Boolean.parseBoolean(paused));
+ } // no else block needed as default is set to false.
+
// batch-time-interval
String batchTimeInterval = atts.getValue(BATCH_TIME_INTERVAL);
if (batchTimeInterval == null) {
@@ -2362,6 +2368,9 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) {
factory.addGatewayEventFilter(gatewayEventFilter);
}
+ if (asyncEventChannelCreation.isDispatchingPaused()) {
+ factory.pauseEventDispatchingToListener();
+ }
factory.setGatewayEventSubstitutionListener(
asyncEventChannelCreation.getGatewayEventSubstitutionFilter());
AsyncEventQueue asyncEventChannel = factory.create(asyncEventChannelCreation.getId(),
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 1e0879f..5b47dfa 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -250,6 +250,7 @@ declarative caching XML file elements unless indicated otherwise.
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string" use="required" />
<xsd:attribute name="parallel" type="xsd:boolean" use="optional" />
+ <xsd:attribute name="pause-event-processing" type="xsd:boolean" use="optional" />
<xsd:attribute name="batch-size" type="xsd:string" use="optional" />
<xsd:attribute name="batch-time-interval" type="xsd:string" use="optional" />
<xsd:attribute name="enable-batch-conflation" type="xsd:boolean" use="optional" />
diff --git a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index de48201..de039f5 100644
--- a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++ b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -1142,6 +1142,34 @@ public class CacheConfig {
protected String orderPolicy;
@XmlAttribute(name = "forward-expiration-destroy")
protected Boolean forwardExpirationDestroy;
+ @XmlAttribute(name = "pause-event-processing")
+ protected Boolean pauseEventProcessing;
+
+ /**
+ * Gets the value of whether the processing of the events queued is paused or not
+ *
+ *
+ * @return {@link Boolean} - true if queue will be created with paused processing of the events
+ * queued
+ * - false if queue will be created without pausing the processing of the events queued
+ *
+ */
+ public Boolean isPauseEventProcessing() {
+ return pauseEventProcessing;
+ }
+
+ /**
+ * Sets the value of whether the processing of the events queued is paused or not
+ *
+ * @param pauseEventProcessing {@link Boolean} - true if queue will be created with paused
+ * processing of the events queued
+ * - false if queue will be created without pausing the processing of the events
+ * queued
+ */
+
+ public void setPauseEventProcessing(Boolean pauseEventProcessing) {
+ this.pauseEventProcessing = pauseEventProcessing;
+ }
/**
* Gets the value of the gatewayEventFilters property.