You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2016/04/01 22:07:56 UTC
[16/18] incubator-geode git commit: GEODE-1062: Refactor of
WANTestBase
GEODE-1062: Refactor of WANTestBase
* Removed the set and unset remove from queue with exception
* Refactor to break out create cache from create receiver
* Reduced create cache, receiver and start sender calls into single lines
* Reordered tests that failed due to starting receiver before region creation
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/65d7a6f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/65d7a6f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/65d7a6f1
Branch: refs/heads/feature/GEODE-17-2
Commit: 65d7a6f1d3853921f83c0754d330e40b9dec62e1
Parents: 7c777b1
Author: Jason Huynh <hu...@gmail.com>
Authored: Mon Mar 28 17:15:24 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Fri Apr 1 10:08:07 2016 -0700
----------------------------------------------------------------------
.../cache/wan/AbstractGatewaySender.java | 14 -
.../AbstractGatewaySenderEventProcessor.java | 6 +-
.../GatewaySenderEventCallbackDispatcher.java | 5 +-
.../cache/wan/GatewaySenderEventDispatcher.java | 2 +-
.../cache/wan/AsyncEventQueueTestBase.java | 28 -
.../client/internal/GatewaySenderBatchOp.java | 17 +-
.../cache/client/internal/SenderProxy.java | 4 +-
.../wan/GatewaySenderEventRemoteDispatcher.java | 45 +-
.../gemfire/internal/cache/wan/WANTestBase.java | 866 ++++++++++---------
...oncurrentParallelGatewaySenderDUnitTest.java | 190 ++--
...allelGatewaySenderOperation_1_DUnitTest.java | 168 ++--
...allelGatewaySenderOperation_2_DUnitTest.java | 9 +
.../ConcurrentWANPropogation_1_DUnitTest.java | 129 +--
.../ConcurrentWANPropogation_2_DUnitTest.java | 118 +--
.../cache/wan/disttx/DistTXWANDUnitTest.java | 38 +-
.../CommonParallelGatewaySenderDUnitTest.java | 51 +-
...wWANConcurrencyCheckForDestroyDUnitTest.java | 11 +-
.../cache/wan/misc/PDXNewWanDUnitTest.java | 87 +-
...dRegion_ParallelWANPersistenceDUnitTest.java | 84 +-
...dRegion_ParallelWANPropogationDUnitTest.java | 116 +--
.../cache/wan/misc/WANSSLDUnitTest.java | 1 +
.../cache/wan/misc/WanValidationsDUnitTest.java | 214 ++---
...arallelGatewaySenderOperationsDUnitTest.java | 47 +-
...llelGatewaySenderQueueOverflowDUnitTest.java | 58 +-
.../ParallelWANConflationDUnitTest.java | 22 +-
...ersistenceEnabledGatewaySenderDUnitTest.java | 368 +++-----
...llelWANPropagationClientServerDUnitTest.java | 7 +-
...lelWANPropagationConcurrentOpsDUnitTest.java | 43 +-
.../ParallelWANPropagationDUnitTest.java | 352 +++-----
...ParallelWANPropagationLoopBackDUnitTest.java | 57 +-
.../wan/parallel/ParallelWANStatsDUnitTest.java | 69 +-
...rialGatewaySenderEventListenerDUnitTest.java | 34 +-
.../SerialGatewaySenderOperationsDUnitTest.java | 71 +-
.../SerialGatewaySenderQueueDUnitTest.java | 18 +-
...ersistenceEnabledGatewaySenderDUnitTest.java | 84 +-
.../SerialWANPropagationLoopBackDUnitTest.java | 22 +-
.../serial/SerialWANPropogationDUnitTest.java | 323 +++----
...NPropogation_PartitionedRegionDUnitTest.java | 86 +-
.../SerialWANPropogationsFeatureDUnitTest.java | 85 +-
.../wan/serial/SerialWANStatsDUnitTest.java | 81 +-
.../management/WANManagementDUnitTest.java | 18 +-
.../pulse/TestRemoteClusterDUnitTest.java | 4 +-
42 files changed, 1521 insertions(+), 2531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 24181c6..4f3488b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -690,20 +690,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
public int getMyDSId() {
return this.myDSId;
}
-
- /**
- * @param removeFromQueueOnException the removeFromQueueOnException to set
- */
- public void setRemoveFromQueueOnException(boolean removeFromQueueOnException) {
- this.removeFromQueueOnException = removeFromQueueOnException;
- }
-
- /**
- * @return the removeFromQueueOnException
- */
- public boolean isRemoveFromQueueOnException() {
- return removeFromQueueOnException;
- }
public CancelCriterion getStopper() {
return this.stopper;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 6bd062c..9cde6dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -629,8 +629,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
logBatchFine("During normal processing, dispatching the following ", conflatedEventsToBeDispatched);
}
- boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
- sender.isRemoveFromQueueOnException(), false);
+ boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, false);
if (success) {
if (isDebugEnabled) {
logger.debug("During normal processing, successfully dispatched {} events (batch #{})",
@@ -673,8 +672,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
} else {
handleUnSuccessfulBatchDispatch(events);
if (!resetLastPeekedEvents) {
- while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
- sender.isRemoveFromQueueOnException(), true)) {
+ while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, true)) {
if (isDebugEnabled) {
logger.debug("During normal processing, unsuccessfully dispatched {} events (batch #{})",
conflatedEventsToBeDispatched.size(), getBatchId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index 38912f3..549c22b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -72,13 +72,10 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
*
* @param events
* The <code>List</code> of events to send
- * @param removeFromQueueOnException
- * Unused.
*
* @return whether the batch of messages was successfully processed
*/
- public boolean dispatchBatch(List events,
- boolean removeFromQueueOnException, boolean isRetry) {
+ public boolean dispatchBatch(List events, boolean isRetry) {
GatewaySenderStats statistics = this.eventProcessor.sender.getStatistics();
boolean success = false;
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
index 0315eb8..57f180c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -23,7 +23,7 @@ import java.util.List;
*/
public interface GatewaySenderEventDispatcher {
- public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry);
+ public boolean dispatchBatch(List events, boolean isRetry);
public boolean isRemoteDispatcher();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index a5b6125..c719538 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -801,34 +801,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
.getUnprocessedTokensAddedByPrimary()));
}
- public static void setRemoveFromQueueOnException(String senderId,
- boolean removeFromQueue) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- assertNotNull(sender);
- ((AbstractGatewaySender)sender)
- .setRemoveFromQueueOnException(removeFromQueue);
- }
-
- public static void unsetRemoveFromQueueOnException(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- assertNotNull(sender);
- ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
- }
-
public static void waitForSenderToBecomePrimary(String senderId) {
Set<GatewaySender> senders = ((GemFireCacheImpl)cache)
.getAllGatewaySenders();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
index 564591d..73b3fc0 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
@@ -49,17 +49,17 @@ public class GatewaySenderBatchOp {
* @param pool the pool to use to communicate with the server.
* @param events list of gateway events
* @param batchId the ID of this batch
- * @param removeFromQueueOnException true if the events should be processed even after some exception
*/
- public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry)
+ public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean isRetry)
{
AbstractOp op = null;
//System.out.println("Version: "+con.getWanSiteVersion());
+ //Is this check even needed anymore? It looks like we just create the same exact op impl with the same parameters...
if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
- op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry);
+ op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry);
} else {
// Default should create a batch of server version (ACCEPTOR.VERSION)
- op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry);
+ op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry);
}
pool.executeOn(con, op, true/*timeoutFatal*/);
}
@@ -88,8 +88,9 @@ public class GatewaySenderBatchOp {
/**
* @throws com.gemstone.gemfire.SerializationException if serialization fails
*/
- public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean removeFromQueueOnException, int dsId, boolean isRetry) {
+ public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, boolean isRetry) {
super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
+ boolean removeFromQueueOnException = true;
if (isRetry) {
getMessage().setIsRetry();
}
@@ -264,9 +265,9 @@ public class GatewaySenderBatchOp {
if (obj instanceof List) {
List<BatchException70> l = (List<BatchException70>)part0.getObject();
- if (logger.isDebugEnabled()) {
- logger.debug("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj);
- }
+ // if (logger.isDebugEnabled()) {
+ logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj);
+ //}
// don't throw Exception but set it in the Ack
BatchException70 be = new BatchException70(l);
ack = new GatewayAck(be, l.get(0).getBatchId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
index acaa109..d3ddfda 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
@@ -30,9 +30,9 @@ public class SenderProxy extends ServerProxy{
super(pool);
}
- public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry)
+ public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean isRetry)
{
- GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, removeFromQueueOnException, isRetry);
+ GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry);
}
public Object receiveAckFromReceiver(Connection con)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 8dcfbb2..83c75f1 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -87,7 +87,7 @@ public class GatewaySenderEventRemoteDispatcher implements
}
}
- protected GatewayAck readAcknowledgement(int lastBatchIdRead) {
+ protected GatewayAck readAcknowledgement() {
SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
GatewayAck ack = null;
Exception ex;
@@ -144,7 +144,7 @@ public class GatewaySenderEventRemoteDispatcher implements
}
@Override
- public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry) {
+ public boolean dispatchBatch(List events, boolean isRetry) {
GatewaySenderStats statistics = this.sender.getStatistics();
boolean success = false;
try {
@@ -216,8 +216,7 @@ public class GatewaySenderEventRemoteDispatcher implements
this.connectionLifeCycleLock.readLock().lock();
try {
if (connection != null) {
- sp.dispatchBatch_NewWAN(connection, events, currentBatchId,
- sender.isRemoveFromQueueOnException(), isRetry);
+ sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
if (logger.isDebugEnabled()) {
logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
this.processor.getSender(), currentBatchId, events.size(), this.processor.getQueue().size(), connection);
@@ -580,7 +579,6 @@ public class GatewaySenderEventRemoteDispatcher implements
@Override
public void run() {
- int lastBatchIdRead = -1;
if (logger.isDebugEnabled()) {
logger.debug("AckReaderThread started.. ");
}
@@ -595,11 +593,10 @@ public class GatewaySenderEventRemoteDispatcher implements
if (checkCancelled()) {
break;
}
- GatewayAck ack = readAcknowledgement(lastBatchIdRead);
+ GatewayAck ack = readAcknowledgement();
if (ack != null) {
boolean gotBatchException = ack.getBatchException() != null;
int batchId = ack.getBatchId();
- lastBatchIdRead = batchId;
int numEvents = ack.getNumEvents();
// If the batch is successfully processed, remove it from the
@@ -615,33 +612,9 @@ public class GatewaySenderEventRemoteDispatcher implements
// log batch exceptions and remove all the events if remove from
// exception is true
// do not remove if it is false
- if (sender.isRemoveFromQueueOnException()) {
- // log the batchExceptions
- logBatchExceptions(ack.getBatchException());
- processor.handleSuccessBatchAck(batchId);
- } else {
- // we assume that batch exception will not occur for PDX related
- // events
- List<GatewaySenderEventImpl> pdxEvents = processor
- .getBatchIdToPDXEventsMap().get(
- ack.getBatchException().getBatchId());
- if (pdxEvents != null) {
- for (GatewaySenderEventImpl senderEvent : pdxEvents) {
- senderEvent.isAcked = true;
- }
- }
- // log the batchExceptions
- logBatchExceptions(ack.getBatchException());
- // remove the events that have been processed.
- BatchException70 be = ack.getBatchException();
- List<BatchException70> exceptions = be.getExceptions();
-
- for (int i = 0; i < exceptions.get(0).getIndex(); i++) {
- processor.eventQueueRemove();
- }
- // reset the sender
- processor.handleException();
- }
+ logBatchExceptions(ack.getBatchException());
+ processor.handleSuccessBatchAck(batchId);
+
} // unsuccessful batch
else { // The batch was successful.
if (logger.isDebugEnabled()) {
@@ -652,9 +625,9 @@ public class GatewaySenderEventRemoteDispatcher implements
}
} else {
// If we have received IOException.
- if (logger.isDebugEnabled()) {
+ // if (logger.isDebugEnabled()) {
logger.debug("{}: Received null ack from remote site.", processor.getSender());
- }
+ //}
processor.handleException();
try { // This wait is before trying to getting new connection to
// receive ack. Without this there will be continuous call to