You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:47 UTC

[geode] 03/11: GEODE-7129: Adding XML config for creating AEQ with paused event processing.

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 50fe3223b49cf90875de919316c99cc9996de9ee
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Sep 3 14:05:46 2019 -0700

    GEODE-7129: Adding XML config for creating AEQ with paused event processing.
    
    	* Cache XML fields added
    	* Cache config fields added.
---
 .../AsyncEventQueueValidationsJUnitTest.java       | 51 ++++++++++++++++++++++
 ...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++
 ...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++
 .../internal/cache/xmlcache/CacheCreation.java     |  3 ++
 .../geode/internal/cache/xmlcache/CacheXml.java    |  1 +
 .../internal/cache/xmlcache/CacheXmlParser.java    |  9 ++++
 .../geode.apache.org/schema/cache/cache-1.0.xsd    |  1 +
 .../geode/cache/configuration/CacheConfig.java     | 28 ++++++++++++
 8 files changed, 165 insertions(+)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
index 6cd325a..44fb069 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
@@ -23,9 +23,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
+import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -40,6 +42,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.apache.geode.internal.cache.wan.MyGatewayEventFilter;
 import org.apache.geode.test.junit.categories.AEQTest;
 import org.apache.geode.util.test.TestUtil;
@@ -151,9 +154,57 @@ public class AsyncEventQueueValidationsJUnitTest {
         .until(() -> filter.getAfterAcknowledgementInvocations() == numPuts);
   }
 
+  @Test
+  @Parameters(method = "getCacheXmlFileBaseNamesForPauseTests")
+  public void whenAsyncQueuesAreStartedInPausedStateShouldNotDispatchEventsTillItIsUnpaused(
+      String cacheXmlFileBaseName) {
+    // Create cache with xml
+    String cacheXmlFileName =
+        createTempFileFromResource(getClass(),
+            getClass().getSimpleName() + "." + cacheXmlFileBaseName + ".cache.xml")
+                .getAbsolutePath();
+    cache = new CacheFactory().set(MCAST_PORT, "0").set(CACHE_XML_FILE, cacheXmlFileName).create();
+
+    // Get AsyncEventQueue and GatewayEventFilter
+    AsyncEventQueue aeq = cache.getAsyncEventQueue(cacheXmlFileBaseName);
+
+    // Get region and do puts
+    Region region = cache.getRegion(cacheXmlFileBaseName);
+    int numPuts = 1000;
+    for (int i = 0; i < numPuts; i++) {
+      region.put(i, i);
+    }
+
+    MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener();
+
+    // Ensure that no listeners are being invoked
+    try {
+      await().atMost(10, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() > 0);
+    } catch (ConditionTimeoutException ex) {
+      // Expected Exception
+    }
+
+    // Ensure that the queue is filling up
+    await().atMost(60, TimeUnit.SECONDS).until(() -> ((AsyncEventQueueImpl) aeq).getSender()
+        .getQueues().stream().mapToInt(i -> i.size()).sum() == 1000);
+
+    // Unpause the sender
+    aeq.resumeEventDispatching();
+
+    // Ensure that listener is being invoke after unpause
+    await().atMost(60, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() == 1000);
+
+
+  }
+
   private Object[] getCacheXmlFileBaseNames() {
     return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlUsesFilter"},
         new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlUsesFilter"});
   }
 
+  private Object[] getCacheXmlFileBaseNamesForPauseTests() {
+    return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"},
+        new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"});
+  }
+
 }
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
new file mode 100644
index 0000000..3bc15ac
--- /dev/null
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd">
+
+  <async-event-queue id="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="true" pause-event-processing="true">
+    <gateway-event-filter>
+      <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name>
+    </gateway-event-filter>
+    <async-event-listener>
+      <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name>
+    </async-event-listener>
+  </async-event-queue>
+
+  <region name="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION">
+    <region-attributes async-event-queue-ids="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"/>
+  </region>
+
+</cache>
+
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
new file mode 100644
index 0000000..8cf4d6a
--- /dev/null
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd">
+
+  <async-event-queue id="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="false" pause-event-processing="true">
+    <gateway-event-filter>
+      <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name>
+    </gateway-event-filter>
+    <async-event-listener>
+      <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name>
+    </async-event-listener>
+  </async-event-queue>
+
+  <region name="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION">
+    <region-attributes async-event-queue-ids="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"/>
+  </region>
+
+</cache>
+
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 2548f23..2803a97 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -553,6 +553,9 @@ public class CacheCreation implements InternalCache {
       AsyncEventQueueFactoryImpl asyncQueueFactory =
           (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
       asyncQueueFactory.configureAsyncEventQueue(asyncEventQueueCreation);
+      if (asyncEventQueueCreation.isDispatchingPaused()) {
+        asyncQueueFactory.pauseEventDispatchingToListener();
+      }
 
       AsyncEventQueue asyncEventQueue = cache.getAsyncEventQueue(asyncEventQueueCreation.getId());
       if (asyncEventQueue == null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
index f9be910..8cf1ceb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
@@ -750,6 +750,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
 
   protected static final String ASYNC_EVENT_LISTENER = "async-event-listener";
   public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
+  public static final String PAUSE_EVENT_PROCESSING = "pause-event-processing";
   protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
   protected static final String FORWARD_EXPIRATION_DESTROY = "forward-expiration-destroy";
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
index 2aeaa6f..03b1987 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
@@ -2246,6 +2246,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       asyncEventQueueCreation.setBatchSize(Integer.parseInt(batchSize));
     }
 
+    // start in Paused state
+    String paused = atts.getValue(PAUSE_EVENT_PROCESSING);
+    if (paused != null) {
+      asyncEventQueueCreation.setPauseEventDispatching(Boolean.parseBoolean(paused));
+    } // no else block needed as default is set to false.
+
     // batch-time-interval
     String batchTimeInterval = atts.getValue(BATCH_TIME_INTERVAL);
     if (batchTimeInterval == null) {
@@ -2362,6 +2368,9 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) {
       factory.addGatewayEventFilter(gatewayEventFilter);
     }
+    if (asyncEventChannelCreation.isDispatchingPaused()) {
+      factory.pauseEventDispatchingToListener();
+    }
     factory.setGatewayEventSubstitutionListener(
         asyncEventChannelCreation.getGatewayEventSubstitutionFilter());
     AsyncEventQueue asyncEventChannel = factory.create(asyncEventChannelCreation.getId(),
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 1e0879f..5b47dfa 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -250,6 +250,7 @@ declarative caching XML file elements unless indicated otherwise.
             </xsd:sequence>
             <xsd:attribute name="id" type="xsd:string" use="required" />
             <xsd:attribute name="parallel" type="xsd:boolean" use="optional" />
+            <xsd:attribute name="pause-event-processing" type="xsd:boolean" use="optional" />
             <xsd:attribute name="batch-size" type="xsd:string" use="optional" />
             <xsd:attribute name="batch-time-interval" type="xsd:string" use="optional" />
             <xsd:attribute name="enable-batch-conflation" type="xsd:boolean" use="optional" />
diff --git a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index de48201..de039f5 100644
--- a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++ b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -1142,6 +1142,34 @@ public class CacheConfig {
     protected String orderPolicy;
     @XmlAttribute(name = "forward-expiration-destroy")
     protected Boolean forwardExpirationDestroy;
+    @XmlAttribute(name = "pause-event-processing")
+    protected Boolean pauseEventProcessing;
+
+    /**
+     * Gets the value of whether the processing of the events queued is paused or not
+     *
+     *
+     * @return {@link Boolean} - true if queue will be created with paused processing of the events
+     *         queued
+     *         - false if queue will be created without pausing the processing of the events queued
+     *
+     */
+    public Boolean isPauseEventProcessing() {
+      return pauseEventProcessing;
+    }
+
+    /**
+     * Sets the value of whether the processing of the events queued is paused or not
+     *
+     * @param pauseEventProcessing {@link Boolean} - true if queue will be created with paused
+     *        processing of the events queued
+     *        - false if queue will be created without pausing the processing of the events
+     *        queued
+     */
+
+    public void setPauseEventProcessing(Boolean pauseEventProcessing) {
+      this.pauseEventProcessing = pauseEventProcessing;
+    }
 
     /**
      * Gets the value of the gatewayEventFilters property.