You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:45 UTC

[geode] 01/11: GEODE-7124: Added new API to create AEQ with paused event processing

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 24047c47f7dbe42809ecc77eeb8b2f8ff5e1769c
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Sep 3 13:07:53 2019 -0700

    GEODE-7124: Added new API to create AEQ with paused event processing
    
             * New API to pause the event processor when AEQ is created
             * Events will still be queued but will not be processed
---
 .../AsyncEventQueueValidationsJUnitTest.java       | 14 ++++++++++
 .../cache/asyncqueue/AsyncEventQueueFactory.java   |  8 ++++++
 .../internal/AsyncEventQueueFactoryImpl.java       | 20 ++++++++++++++
 .../internal/ParallelAsyncEventQueueImpl.java      |  3 +++
 .../internal/SerialAsyncEventQueueImpl.java        |  3 +++
 .../internal/cache/wan/AbstractGatewaySender.java  | 31 ++++++++++++++++++++++
 .../internal/cache/wan/InternalGatewaySender.java  |  2 ++
 .../internal/AsyncEventQueueFactoryImplTest.java   |  9 +++++++
 .../wan/parallel/ParallelGatewaySenderImpl.java    |  3 +++
 .../cache/wan/serial/SerialGatewaySenderImpl.java  |  3 +++
 10 files changed, 96 insertions(+)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
index 9eadeef..3c10e36 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
@@ -35,6 +35,7 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
@@ -72,6 +73,19 @@ public class AsyncEventQueueValidationsJUnitTest {
   }
 
   @Test
+  @Parameters({"true", "false"})
+  public void whenAEQCreatedInPausedStateThenSenderIsStartedInPausedState(boolean isParallel) {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    AsyncEventQueueFactory fact = cache.createAsyncEventQueueFactory()
+        .setParallel(isParallel)
+        .pauseEventDispatchingToListener()
+        .setDispatcherThreads(5);
+    AsyncEventQueue aeq =
+        fact.create("aeqID", new org.apache.geode.internal.cache.wan.MyAsyncEventListener());
+    assertTrue(((AsyncEventQueueImpl) aeq).getSender().isPaused());
+  }
+
+  @Test
   public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyThread() {
     cache = new CacheFactory().set(MCAST_PORT, "0").create();
     try {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
index e0329a2..1a5145e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -150,6 +150,12 @@ public interface AsyncEventQueueFactory {
   AsyncEventQueueFactory setForwardExpirationDestroy(boolean forward);
 
   /**
+   * Pauses the dispatching of the queued events to the listener.
+   *
+   */
+  AsyncEventQueueFactory pauseEventDispatchingToListener();
+
+  /**
    * Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue and instance of
    * AsyncEventListener. Multiple queues can be created using Same listener instance. So, the
    * instance of <code>AsyncEventListener</code> should be thread safe in that case. The
@@ -162,4 +168,6 @@ public interface AsyncEventQueueFactory {
    *        to use this queue.
    */
   AsyncEventQueue create(String id, AsyncEventListener listener);
+
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index 70f0c2c..26f29ee 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -18,6 +18,7 @@ import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.get
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
@@ -46,6 +47,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
 
   private final InternalCache cache;
 
+  private boolean pauseEventsDispatchingToListener = false;
+
   /**
    * Used internally to pass the attributes from this factory to the real GatewaySender it is
    * creating.
@@ -159,6 +162,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
     if (cache instanceof CacheCreation) {
       asyncEventQueue =
           new AsyncEventQueueCreation(asyncQueueId, gatewaySenderAttributes, listener);
+      if (pauseEventsDispatchingToListener) {
+        ((AsyncEventQueueCreation) asyncEventQueue).setPauseEventDispatching(true);
+      }
       ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
     } else {
       if (logger.isDebugEnabled()) {
@@ -171,6 +177,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
       AsyncEventQueueImpl asyncEventQueueImpl = new AsyncEventQueueImpl(sender, listener);
       asyncEventQueue = asyncEventQueueImpl;
       cache.addAsyncEventQueue(asyncEventQueueImpl);
+      if (pauseEventsDispatchingToListener) {
+        sender.setStartEventProcessorInPausedState();
+      }
       if (!gatewaySenderAttributes.isManualStart()) {
         sender.start();
       }
@@ -267,4 +276,15 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
     gatewaySenderAttributes.forwardExpirationDestroy = forward;
     return this;
   }
+
+  @Override
+  public AsyncEventQueueFactory pauseEventDispatchingToListener() {
+    pauseEventsDispatchingToListener = true;
+    return this;
+  }
+
+  @VisibleForTesting
+  protected boolean isPauseEventsDispatchingToListener() {
+    return pauseEventsDispatchingToListener;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 222f00b..a6d9799 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -78,6 +78,9 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
        */
       eventProcessor =
           new ConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj());
+      if (startEventProcessorInPausedState) {
+        pauseEvenIfProcessorStopped();
+      }
       eventProcessor.start();
       waitForRunningStatus();
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 259ae81..478a20a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -89,6 +89,9 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
         eventProcessor = new SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this,
             getId(), getThreadMonitorObj());
       }
+      if (startEventProcessorInPausedState) {
+        pauseEvenIfProcessorStopped();
+      }
       eventProcessor.start();
       waitForRunningStatus();
       this.startTime = System.currentTimeMillis();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 8563e8d..ad7b3a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -207,6 +207,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
    */
   protected static final String META_DATA_REGION_NAME = "gatewayEventIdIndexMetaData";
 
+  protected boolean startEventProcessorInPausedState = false;
+
   protected int myDSId = DEFAULT_DISTRIBUTED_SYSTEM_ID;
 
   protected int connectionIdleTimeOut = GATEWAY_CONNECTION_IDLE_TIMEOUT;
@@ -790,6 +792,35 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
     }
   }
 
+  public boolean isStartEventProcessorInPausedState() {
+    return startEventProcessorInPausedState;
+  }
+
+  public void setStartEventProcessorInPausedState() {
+    startEventProcessorInPausedState = true;
+  }
+
+  /**
+   * This pause will set the pause flag even if the
+   * processor has not yet started.
+   */
+  public void pauseEvenIfProcessorStopped() {
+    if (this.eventProcessor != null) {
+      this.getLifeCycleLock().writeLock().lock();
+      try {
+        this.eventProcessor.pauseDispatching();
+        InternalDistributedSystem system =
+            (InternalDistributedSystem) this.cache.getDistributedSystem();
+        system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_PAUSE, this);
+        logger.info("Paused {}", this);
+
+        enqueueTempEvents();
+      } finally {
+        this.getLifeCycleLock().writeLock().unlock();
+      }
+    }
+  }
+
   @Override
   public void pause() {
     if (this.eventProcessor != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
index 783e1d9..ba4f617 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
@@ -42,4 +42,6 @@ public interface InternalGatewaySender extends GatewaySender {
   InternalCache getCache();
 
   void destroy(boolean initiator);
+
+  void setStartEventProcessorInPausedState();
 }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
index 2ecb459..9535782 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
@@ -17,6 +17,7 @@
 package org.apache.geode.cache.asyncqueue.internal;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 
@@ -43,6 +44,14 @@ public class AsyncEventQueueFactoryImplTest {
 
   }
 
+  @Test
+  public void whenAsyncEventQueueIsStartedInPausedStateThenSenderMustBePaused() {
+    asyncEventQueueFactory = new AsyncEventQueueFactoryImpl(cache);
+    asyncEventQueueFactory.pauseEventDispatchingToListener();
+    assertTrue(
+        ((AsyncEventQueueFactoryImpl) asyncEventQueueFactory).isPauseEventsDispatchingToListener());
+  }
+
   /**
    * Test to verify that AsyncEventQueue can not be created when null listener is passed.
    */
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index a5c19d2..4f67933 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -72,6 +72,9 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
        */
       eventProcessor =
           new RemoteConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj());
+      if (isStartEventProcessorInPausedState()) {
+        this.pauseEvenIfProcessorStopped();
+      }
       eventProcessor.start();
       waitForRunningStatus();
 
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d5cfe31..548b4cb 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -83,6 +83,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
         eventProcessor = new RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this,
             getId(), getThreadMonitorObj());
       }
+      if (isStartEventProcessorInPausedState()) {
+        this.pauseEvenIfProcessorStopped();
+      }
       eventProcessor.start();
       waitForRunningStatus();
       this.startTime = System.currentTimeMillis();