You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:45 UTC
[geode] 01/11: GEODE-7124: Added new API to create AEQ with paused
event processing
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 24047c47f7dbe42809ecc77eeb8b2f8ff5e1769c
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Sep 3 13:07:53 2019 -0700
GEODE-7124: Added new API to create AEQ with paused event processing
* New API to pause the event processor when AEQ is created
* Events will still be queued but will not be processed
---
.../AsyncEventQueueValidationsJUnitTest.java | 14 ++++++++++
.../cache/asyncqueue/AsyncEventQueueFactory.java | 8 ++++++
.../internal/AsyncEventQueueFactoryImpl.java | 20 ++++++++++++++
.../internal/ParallelAsyncEventQueueImpl.java | 3 +++
.../internal/SerialAsyncEventQueueImpl.java | 3 +++
.../internal/cache/wan/AbstractGatewaySender.java | 31 ++++++++++++++++++++++
.../internal/cache/wan/InternalGatewaySender.java | 2 ++
.../internal/AsyncEventQueueFactoryImplTest.java | 9 +++++++
.../wan/parallel/ParallelGatewaySenderImpl.java | 3 +++
.../cache/wan/serial/SerialGatewaySenderImpl.java | 3 +++
10 files changed, 96 insertions(+)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
index 9eadeef..3c10e36 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
@@ -35,6 +35,7 @@ import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
@@ -72,6 +73,19 @@ public class AsyncEventQueueValidationsJUnitTest {
}
@Test
+ @Parameters({"true", "false"})
+ public void whenAEQCreatedInPausedStateThenSenderIsStartedInPausedState(boolean isParallel) {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ AsyncEventQueueFactory fact = cache.createAsyncEventQueueFactory()
+ .setParallel(isParallel)
+ .pauseEventDispatchingToListener()
+ .setDispatcherThreads(5);
+ AsyncEventQueue aeq =
+ fact.create("aeqID", new org.apache.geode.internal.cache.wan.MyAsyncEventListener());
+ assertTrue(((AsyncEventQueueImpl) aeq).getSender().isPaused());
+ }
+
+ @Test
public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyThread() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
try {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
index e0329a2..1a5145e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -150,6 +150,12 @@ public interface AsyncEventQueueFactory {
AsyncEventQueueFactory setForwardExpirationDestroy(boolean forward);
/**
+ * Pauses the dispatching of the queued events to the listener.
+ *
+ */
+ AsyncEventQueueFactory pauseEventDispatchingToListener();
+
+ /**
* Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue and instance of
* AsyncEventListener. Multiple queues can be created using Same listener instance. So, the
* instance of <code>AsyncEventListener</code> should be thread safe in that case. The
@@ -162,4 +168,6 @@ public interface AsyncEventQueueFactory {
* to use this queue.
*/
AsyncEventQueue create(String id, AsyncEventListener listener);
+
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index 70f0c2c..26f29ee 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -18,6 +18,7 @@ import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.get
import org.apache.logging.log4j.Logger;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
@@ -46,6 +47,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
private final InternalCache cache;
+ private boolean pauseEventsDispatchingToListener = false;
+
/**
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
@@ -159,6 +162,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
if (cache instanceof CacheCreation) {
asyncEventQueue =
new AsyncEventQueueCreation(asyncQueueId, gatewaySenderAttributes, listener);
+ if (pauseEventsDispatchingToListener) {
+ ((AsyncEventQueueCreation) asyncEventQueue).setPauseEventDispatching(true);
+ }
((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
} else {
if (logger.isDebugEnabled()) {
@@ -171,6 +177,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
AsyncEventQueueImpl asyncEventQueueImpl = new AsyncEventQueueImpl(sender, listener);
asyncEventQueue = asyncEventQueueImpl;
cache.addAsyncEventQueue(asyncEventQueueImpl);
+ if (pauseEventsDispatchingToListener) {
+ sender.setStartEventProcessorInPausedState();
+ }
if (!gatewaySenderAttributes.isManualStart()) {
sender.start();
}
@@ -267,4 +276,15 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
gatewaySenderAttributes.forwardExpirationDestroy = forward;
return this;
}
+
+ @Override
+ public AsyncEventQueueFactory pauseEventDispatchingToListener() {
+ pauseEventsDispatchingToListener = true;
+ return this;
+ }
+
+ @VisibleForTesting
+ protected boolean isPauseEventsDispatchingToListener() {
+ return pauseEventsDispatchingToListener;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 222f00b..a6d9799 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -78,6 +78,9 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
*/
eventProcessor =
new ConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj());
+ if (startEventProcessorInPausedState) {
+ pauseEvenIfProcessorStopped();
+ }
eventProcessor.start();
waitForRunningStatus();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 259ae81..478a20a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -89,6 +89,9 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
eventProcessor = new SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this,
getId(), getThreadMonitorObj());
}
+ if (startEventProcessorInPausedState) {
+ pauseEvenIfProcessorStopped();
+ }
eventProcessor.start();
waitForRunningStatus();
this.startTime = System.currentTimeMillis();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 8563e8d..ad7b3a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -207,6 +207,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
*/
protected static final String META_DATA_REGION_NAME = "gatewayEventIdIndexMetaData";
+ protected boolean startEventProcessorInPausedState = false;
+
protected int myDSId = DEFAULT_DISTRIBUTED_SYSTEM_ID;
protected int connectionIdleTimeOut = GATEWAY_CONNECTION_IDLE_TIMEOUT;
@@ -790,6 +792,35 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
}
}
+ public boolean isStartEventProcessorInPausedState() {
+ return startEventProcessorInPausedState;
+ }
+
+ public void setStartEventProcessorInPausedState() {
+ startEventProcessorInPausedState = true;
+ }
+
+ /**
+ * This pause will set the pause flag even if the
+ * processor has not yet started.
+ */
+ public void pauseEvenIfProcessorStopped() {
+ if (this.eventProcessor != null) {
+ this.getLifeCycleLock().writeLock().lock();
+ try {
+ this.eventProcessor.pauseDispatching();
+ InternalDistributedSystem system =
+ (InternalDistributedSystem) this.cache.getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_PAUSE, this);
+ logger.info("Paused {}", this);
+
+ enqueueTempEvents();
+ } finally {
+ this.getLifeCycleLock().writeLock().unlock();
+ }
+ }
+ }
+
@Override
public void pause() {
if (this.eventProcessor != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
index 783e1d9..ba4f617 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
@@ -42,4 +42,6 @@ public interface InternalGatewaySender extends GatewaySender {
InternalCache getCache();
void destroy(boolean initiator);
+
+ void setStartEventProcessorInPausedState();
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
index 2ecb459..9535782 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java
@@ -17,6 +17,7 @@
package org.apache.geode.cache.asyncqueue.internal;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -43,6 +44,14 @@ public class AsyncEventQueueFactoryImplTest {
}
+ @Test
+ public void whenAsyncEventQueueIsStartedInPausedStateThenSenderMustBePaused() {
+ asyncEventQueueFactory = new AsyncEventQueueFactoryImpl(cache);
+ asyncEventQueueFactory.pauseEventDispatchingToListener();
+ assertTrue(
+ ((AsyncEventQueueFactoryImpl) asyncEventQueueFactory).isPauseEventsDispatchingToListener());
+ }
+
/**
* Test to verify that AsyncEventQueue can not be created when null listener is passed.
*/
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index a5c19d2..4f67933 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -72,6 +72,9 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
*/
eventProcessor =
new RemoteConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj());
+ if (isStartEventProcessorInPausedState()) {
+ this.pauseEvenIfProcessorStopped();
+ }
eventProcessor.start();
waitForRunningStatus();
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d5cfe31..548b4cb 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -83,6 +83,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
eventProcessor = new RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this,
getId(), getThreadMonitorObj());
}
+ if (isStartEventProcessorInPausedState()) {
+ this.pauseEvenIfProcessorStopped();
+ }
eventProcessor.start();
waitForRunningStatus();
this.startTime = System.currentTimeMillis();