You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2016/04/01 22:07:56 UTC

[16/18] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase

GEODE-1062: Refactor of WANTestBase

* Removed the set and unset remove from queue with exception
* Refactor to break out create cache from create receiver
* Reduced create cache, receiver and start sender calls into single lines
* Reordered tests that failed due to starting receiver before region creation


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/65d7a6f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/65d7a6f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/65d7a6f1

Branch: refs/heads/feature/GEODE-17-2
Commit: 65d7a6f1d3853921f83c0754d330e40b9dec62e1
Parents: 7c777b1
Author: Jason Huynh <hu...@gmail.com>
Authored: Mon Mar 28 17:15:24 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Fri Apr 1 10:08:07 2016 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        |  14 -
 .../AbstractGatewaySenderEventProcessor.java    |   6 +-
 .../GatewaySenderEventCallbackDispatcher.java   |   5 +-
 .../cache/wan/GatewaySenderEventDispatcher.java |   2 +-
 .../cache/wan/AsyncEventQueueTestBase.java      |  28 -
 .../client/internal/GatewaySenderBatchOp.java   |  17 +-
 .../cache/client/internal/SenderProxy.java      |   4 +-
 .../wan/GatewaySenderEventRemoteDispatcher.java |  45 +-
 .../gemfire/internal/cache/wan/WANTestBase.java | 866 ++++++++++---------
 ...oncurrentParallelGatewaySenderDUnitTest.java | 190 ++--
 ...allelGatewaySenderOperation_1_DUnitTest.java | 168 ++--
 ...allelGatewaySenderOperation_2_DUnitTest.java |   9 +
 .../ConcurrentWANPropogation_1_DUnitTest.java   | 129 +--
 .../ConcurrentWANPropogation_2_DUnitTest.java   | 118 +--
 .../cache/wan/disttx/DistTXWANDUnitTest.java    |  38 +-
 .../CommonParallelGatewaySenderDUnitTest.java   |  51 +-
 ...wWANConcurrencyCheckForDestroyDUnitTest.java |  11 +-
 .../cache/wan/misc/PDXNewWanDUnitTest.java      |  87 +-
 ...dRegion_ParallelWANPersistenceDUnitTest.java |  84 +-
 ...dRegion_ParallelWANPropogationDUnitTest.java | 116 +--
 .../cache/wan/misc/WANSSLDUnitTest.java         |   1 +
 .../cache/wan/misc/WanValidationsDUnitTest.java | 214 ++---
 ...arallelGatewaySenderOperationsDUnitTest.java |  47 +-
 ...llelGatewaySenderQueueOverflowDUnitTest.java |  58 +-
 .../ParallelWANConflationDUnitTest.java         |  22 +-
 ...ersistenceEnabledGatewaySenderDUnitTest.java | 368 +++-----
 ...llelWANPropagationClientServerDUnitTest.java |   7 +-
 ...lelWANPropagationConcurrentOpsDUnitTest.java |  43 +-
 .../ParallelWANPropagationDUnitTest.java        | 352 +++-----
 ...ParallelWANPropagationLoopBackDUnitTest.java |  57 +-
 .../wan/parallel/ParallelWANStatsDUnitTest.java |  69 +-
 ...rialGatewaySenderEventListenerDUnitTest.java |  34 +-
 .../SerialGatewaySenderOperationsDUnitTest.java |  71 +-
 .../SerialGatewaySenderQueueDUnitTest.java      |  18 +-
 ...ersistenceEnabledGatewaySenderDUnitTest.java |  84 +-
 .../SerialWANPropagationLoopBackDUnitTest.java  |  22 +-
 .../serial/SerialWANPropogationDUnitTest.java   | 323 +++----
 ...NPropogation_PartitionedRegionDUnitTest.java |  86 +-
 .../SerialWANPropogationsFeatureDUnitTest.java  |  85 +-
 .../wan/serial/SerialWANStatsDUnitTest.java     |  81 +-
 .../management/WANManagementDUnitTest.java      |  18 +-
 .../pulse/TestRemoteClusterDUnitTest.java       |   4 +-
 42 files changed, 1521 insertions(+), 2531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 24181c6..4f3488b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -690,20 +690,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   public int getMyDSId() {
     return this.myDSId;
   }
-  
-  /**
-   * @param removeFromQueueOnException the removeFromQueueOnException to set
-   */
-  public void setRemoveFromQueueOnException(boolean removeFromQueueOnException) {
-    this.removeFromQueueOnException = removeFromQueueOnException;
-  }
-
-  /**
-   * @return the removeFromQueueOnException
-   */
-  public boolean isRemoveFromQueueOnException() {
-    return removeFromQueueOnException;
-  }
 
   public CancelCriterion getStopper() {
     return this.stopper;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 6bd062c..9cde6dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -629,8 +629,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
             logBatchFine("During normal processing, dispatching the following ", conflatedEventsToBeDispatched);
           }
           
-          boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
-              sender.isRemoveFromQueueOnException(), false);
+          boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, false);
           if (success) {
             if (isDebugEnabled) {
               logger.debug("During normal processing, successfully dispatched {} events (batch #{})",
@@ -673,8 +672,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
             } else {
               handleUnSuccessfulBatchDispatch(events);
               if (!resetLastPeekedEvents) {
-                while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
-                    sender.isRemoveFromQueueOnException(), true)) {
+                while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, true)) {
                   if (isDebugEnabled) {
                     logger.debug("During normal processing, unsuccessfully dispatched {} events (batch #{})",
                         conflatedEventsToBeDispatched.size(), getBatchId());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index 38912f3..549c22b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -72,13 +72,10 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
    * 
    * @param events
    *          The <code>List</code> of events to send
-   * @param removeFromQueueOnException
-   *          Unused.
    * 
    * @return whether the batch of messages was successfully processed
    */
-  public boolean dispatchBatch(List events,
-      boolean removeFromQueueOnException, boolean isRetry) {
+  public boolean dispatchBatch(List events, boolean isRetry) {
     GatewaySenderStats statistics = this.eventProcessor.sender.getStatistics();
     boolean success = false;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
index 0315eb8..57f180c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -23,7 +23,7 @@ import java.util.List;
  */
 public interface GatewaySenderEventDispatcher {
   
-  public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry);
+  public boolean dispatchBatch(List events, boolean isRetry);
   
   public boolean isRemoteDispatcher();
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index a5b6125..c719538 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -801,34 +801,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
             .getUnprocessedTokensAddedByPrimary()));
   }
 
-  public static void setRemoveFromQueueOnException(String senderId,
-      boolean removeFromQueue) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    assertNotNull(sender);
-    ((AbstractGatewaySender)sender)
-        .setRemoveFromQueueOnException(removeFromQueue);
-  }
-
-  public static void unsetRemoveFromQueueOnException(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    assertNotNull(sender);
-    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
-  }
-
   public static void waitForSenderToBecomePrimary(String senderId) {
     Set<GatewaySender> senders = ((GemFireCacheImpl)cache)
         .getAllGatewaySenders();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
index 564591d..73b3fc0 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
@@ -49,17 +49,17 @@ public class GatewaySenderBatchOp {
    * @param pool the pool to use to communicate with the server.
    * @param events list of gateway events
    * @param batchId the ID of this batch
-   * @param removeFromQueueOnException true if the events should be processed even after some exception
    */
-  public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry)
+  public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean isRetry)
   {
     AbstractOp op = null;
     //System.out.println("Version: "+con.getWanSiteVersion());
+    //Is this check even needed anymore?  It looks like we just create the same exact op impl with the same parameters...
     if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
-      op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry);
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry);
     } else {
       // Default should create a batch of server version (ACCEPTOR.VERSION)
-      op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry);     
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry);
     }
     pool.executeOn(con, op, true/*timeoutFatal*/);
   }
@@ -88,8 +88,9 @@ public class GatewaySenderBatchOp {
     /**
      * @throws com.gemstone.gemfire.SerializationException if serialization fails
      */
-    public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean removeFromQueueOnException, int dsId, boolean isRetry)  {
+    public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, boolean isRetry)  {
       super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
+      boolean removeFromQueueOnException = true;
       if (isRetry) {
         getMessage().setIsRetry();
       }
@@ -264,9 +265,9 @@ public class GatewaySenderBatchOp {
           if (obj instanceof List) {
             List<BatchException70> l = (List<BatchException70>)part0.getObject();
 
-            if (logger.isDebugEnabled()) {
-              logger.debug("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj);
-            }
+           // if (logger.isDebugEnabled()) {
+              logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj);
+            //}
             // don't throw Exception but set it in the Ack
             BatchException70 be = new BatchException70(l);
             ack = new GatewayAck(be, l.get(0).getBatchId());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
index acaa109..d3ddfda 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
@@ -30,9 +30,9 @@ public class SenderProxy extends ServerProxy{
     super(pool);
   }
 
-  public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry)
+  public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean isRetry)
   {
-    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, removeFromQueueOnException, isRetry);
+    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry);
   }
   
   public Object receiveAckFromReceiver(Connection con)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 8dcfbb2..83c75f1 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -87,7 +87,7 @@ public class GatewaySenderEventRemoteDispatcher implements
     }
   }
   
-  protected GatewayAck readAcknowledgement(int lastBatchIdRead) {
+  protected GatewayAck readAcknowledgement() {
     SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
     GatewayAck ack = null;
     Exception ex;
@@ -144,7 +144,7 @@ public class GatewaySenderEventRemoteDispatcher implements
   }
   
   @Override
-  public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry) {
+  public boolean dispatchBatch(List events, boolean isRetry) {
     GatewaySenderStats statistics = this.sender.getStatistics();
     boolean success = false;
     try {
@@ -216,8 +216,7 @@ public class GatewaySenderEventRemoteDispatcher implements
       this.connectionLifeCycleLock.readLock().lock();
       try {
         if (connection != null) {
-          sp.dispatchBatch_NewWAN(connection, events, currentBatchId,
-              sender.isRemoveFromQueueOnException(), isRetry);
+          sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
           if (logger.isDebugEnabled()) {
             logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
                 this.processor.getSender(), currentBatchId,  events.size(), this.processor.getQueue().size(), connection);
@@ -580,7 +579,6 @@ public class GatewaySenderEventRemoteDispatcher implements
 
     @Override
     public void run() {
-      int lastBatchIdRead = -1;
       if (logger.isDebugEnabled()) {
         logger.debug("AckReaderThread started.. ");
       }
@@ -595,11 +593,10 @@ public class GatewaySenderEventRemoteDispatcher implements
           if (checkCancelled()) {
             break;
           }
-          GatewayAck ack = readAcknowledgement(lastBatchIdRead);
+          GatewayAck ack = readAcknowledgement();
           if (ack != null) {
             boolean gotBatchException = ack.getBatchException() != null;
             int batchId = ack.getBatchId();
-            lastBatchIdRead = batchId;
             int numEvents = ack.getNumEvents();
 
             // If the batch is successfully processed, remove it from the
@@ -615,33 +612,9 @@ public class GatewaySenderEventRemoteDispatcher implements
               // log batch exceptions and remove all the events if remove from
               // exception is true
               // do not remove if it is false
-              if (sender.isRemoveFromQueueOnException()) {
-                // log the batchExceptions
-                logBatchExceptions(ack.getBatchException());
-                  processor.handleSuccessBatchAck(batchId);
-              } else {
-                // we assume that batch exception will not occur for PDX related
-                // events
-                List<GatewaySenderEventImpl> pdxEvents = processor
-                    .getBatchIdToPDXEventsMap().get(
-                        ack.getBatchException().getBatchId());
-                if (pdxEvents != null) {
-                  for (GatewaySenderEventImpl senderEvent : pdxEvents) {
-                    senderEvent.isAcked = true;
-                  }
-                }
-                // log the batchExceptions
-                logBatchExceptions(ack.getBatchException());
-                // remove the events that have been processed.
-                BatchException70 be = ack.getBatchException();
-                List<BatchException70> exceptions = be.getExceptions();
-
-                for (int i = 0; i < exceptions.get(0).getIndex(); i++) {
-                  processor.eventQueueRemove();
-                }
-                // reset the sender
-                processor.handleException();
-              }
+              logBatchExceptions(ack.getBatchException());
+              processor.handleSuccessBatchAck(batchId);
+
             } // unsuccessful batch
             else { // The batch was successful.
               if (logger.isDebugEnabled()) {
@@ -652,9 +625,9 @@ public class GatewaySenderEventRemoteDispatcher implements
             }
           } else {
             // If we have received IOException.
-            if (logger.isDebugEnabled()) {
+           // if (logger.isDebugEnabled()) {
               logger.debug("{}: Received null ack from remote site.", processor.getSender());
-            }
+            //}
             processor.handleException();
             try { // This wait is before trying to getting new connection to
                   // receive ack. Without this there will be continuous call to