You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/03/23 18:49:46 UTC

incubator-geode git commit: GEODE-911: Cleaning up SerialGatewaySender queues when stopping gateway

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 86c89a94d -> 3b7e3ff35


GEODE-911: Cleaning up SerialGatewaySender queues when stopping gateway

The stop() method was setting the EventProcessor to null before cleaning
up the queues.  This led to no queues being cleaned up because the method
getQueues would return null if the eventProcessor was null.

Refactors WanTestBase code, remove/condensing duplicated code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3b7e3ff3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3b7e3ff3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3b7e3ff3

Branch: refs/heads/develop
Commit: 3b7e3ff357f52da388d2cfd91acf2385f1f7d2dd
Parents: 86c89a9
Author: Jason Huynh <hu...@gmail.com>
Authored: Thu Mar 17 16:24:40 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Wed Mar 23 10:13:48 2016 -0700

----------------------------------------------------------------------
 .../wan/serial/SerialGatewaySenderQueue.java    |   7 +
 .../wan/serial/SerialGatewaySenderImpl.java     |  26 +--
 .../gemfire/internal/cache/wan/WANTestBase.java | 218 ++++++-------------
 ...GatewaySenderOperationsOffHeapDUnitTest.java |   2 +-
 .../SerialGatewaySenderOperationsDUnitTest.java |   7 +-
 5 files changed, 84 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 430409a..1a3ae8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1046,6 +1046,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     }
   }
   
+  public boolean isRemovalThreadAlive() {
+    if (this.removalThread != null) {
+      return this.removalThread.isAlive();
+    }
+    return false;
+  }
+  
   @Override
   public void close() {
     Region r = getRegion();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index 8f0070f..bf9fc56 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 package com.gemstone.gemfire.internal.cache.wan.serial;
+import java.util.Set;
+
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
 import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
 import com.gemstone.gemfire.distributed.DistributedLockService;
 import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.ResourceEvent;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
@@ -34,15 +32,11 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.RegionQueue;
 import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
 import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
-import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
@@ -141,8 +135,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
       if (ev != null && !ev.isStopped()) {
         ev.stopProcessing();
       }
-      this.eventProcessor = null;
-
+      
       // Stop the proxy (after the dispatcher, so the socket is still
       // alive until after the dispatcher has stopped)
       stompProxyDead();
@@ -152,11 +145,12 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
         listener.close();
       }
       logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
-      
+
       clearTempEventsAfterSenderStopped();
     } finally {
       this.getLifeCycleLock().writeLock().unlock();
     }
+   
     if (this.isPrimary()) {
       try {
         DistributedLockService
@@ -165,11 +159,13 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
         // service not found... ignore
       }
     }
-    if (getQueues() != null && !getQueues().isEmpty()) {
-      for (RegionQueue q : getQueues()) {
+    Set<RegionQueue> queues = getQueues();
+    if (queues != null && !queues.isEmpty()) {
+      for (RegionQueue q : queues) {
         ((SerialGatewaySenderQueue)q).cleanUp();
       }
     }
+   
     this.setIsPrimary(false);
     new UpdateAttributesProcessor(this).distribute(false);
     Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
@@ -190,6 +186,8 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
     InternalDistributedSystem system = (InternalDistributedSystem) this.cache
         .getDistributedSystem();
     system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+    
+    this.eventProcessor = null;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 2f44fce..bfcb910 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -112,6 +112,8 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa
 import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue;
 import com.gemstone.gemfire.pdx.SimpleClass;
 import com.gemstone.gemfire.pdx.SimpleClass1;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
@@ -2221,7 +2223,7 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-
+  
   public static void stopSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2235,7 +2237,21 @@ public class WANTestBase extends DistributedTestCase{
           break;
         }
       }
+      AbstractGatewaySenderEventProcessor eventProcessor = null;
+      if (sender instanceof AbstractGatewaySender) {
+        eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
+      }
       sender.stop();
+      
+      Set<RegionQueue> queues = null;
+      if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
+        queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues();
+        for (RegionQueue queue: queues) {
+          if (queue instanceof SerialGatewaySenderQueue) {
+            assertFalse(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive());
+          }
+        }
+      }
     }
     finally {
       exp.remove();
@@ -2261,6 +2277,35 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
   
+  public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
+    
+    InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
+    gateway.setParallel(isParallel);
+    gateway.setMaximumQueueMemory(maxMemory);
+    gateway.setBatchSize(batchSize);
+    gateway.setBatchConflationEnabled(isConflation);
+    gateway.setManualStart(isManualStart);
+    gateway.setDispatcherThreads(numDispatchers);
+    gateway.setOrderPolicy(policy);
+    ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+    if (filter != null) {
+      eventFilter = filter;
+      gateway.addGatewayEventFilter(filter);
+    }
+    if (isPersistent) {
+      gateway.setPersistenceEnabled(true);
+      gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+          .getName());
+    } else {
+      DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+      gateway.setDiskStoreName(store.getName());
+    }
+    return gateway;
+  }
+  
   public static void createSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -2272,55 +2317,9 @@ public class WANTestBase extends DistributedTestCase{
       persistentDirectory.mkdir();
       DiskStoreFactory dsf = cache.createDiskStoreFactory();
       File[] dirs1 = new File[] { persistentDirectory };
-      if (isParallel) {
-        InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
-        gateway.setParallel(true);
-        gateway.setMaximumQueueMemory(maxMemory);
-        gateway.setBatchSize(batchSize);
-        gateway.setManualStart(isManualStart);
-        //set dispatcher threads
-        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
-        ((InternalGatewaySenderFactory) gateway)
-            .setLocatorDiscoveryCallback(new MyLocatorCallback());
-        if (filter != null) {
-          eventFilter = filter;
-          gateway.addGatewayEventFilter(filter);
-        }
-        if (isPersistent) {
-          gateway.setPersistenceEnabled(true);
-          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-              .getName());
-        } else {
-          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-          gateway.setDiskStoreName(store.getName());
-        }
-        gateway.setBatchConflationEnabled(isConflation);
-        gateway.create(dsName, remoteDsId);
+      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+      gateway.create(dsName, remoteDsId);
 
-      } else {
-        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
-        gateway.setMaximumQueueMemory(maxMemory);
-        gateway.setBatchSize(batchSize);
-        gateway.setManualStart(isManualStart);
-        //set dispatcher threads
-        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
-        ((InternalGatewaySenderFactory) gateway)
-            .setLocatorDiscoveryCallback(new MyLocatorCallback());
-        if (filter != null) {
-          eventFilter = filter;
-          gateway.addGatewayEventFilter(filter);
-        }
-        gateway.setBatchConflationEnabled(isConflation);
-        if (isPersistent) {
-          gateway.setPersistenceEnabled(true);
-          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-              .getName());
-        } else {
-          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-          gateway.setDiskStoreName(store.getName());
-        }
-        gateway.create(dsName, remoteDsId);
-      }
     } finally {
       exln.remove();
     }
@@ -2329,66 +2328,20 @@ public class WANTestBase extends DistributedTestCase{
   public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
 	boolean isParallel, Integer maxMemory,
 	Integer batchSize, boolean isConflation, boolean isPersistent,
-	GatewayEventFilter filter, boolean isManulaStart, int numDispatchers, OrderPolicy orderPolicy) {
+	GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy) {
 	  final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
-	  try {
-		File persistentDirectory = new File(dsName + "_disk_"
-			+ System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-		persistentDirectory.mkdir();
-		DiskStoreFactory dsf = cache.createDiskStoreFactory();
-		File[] dirs1 = new File[] { persistentDirectory };
-		if (isParallel) {
-		  GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
-	      gateway.setParallel(true);
-	      gateway.setMaximumQueueMemory(maxMemory);
-	      gateway.setBatchSize(batchSize);
-	      gateway.setManualStart(isManulaStart);
-	      ((InternalGatewaySenderFactory) gateway)
-	      .setLocatorDiscoveryCallback(new MyLocatorCallback());
-	      if (filter != null) {
-	    	eventFilter = filter;
-	        gateway.addGatewayEventFilter(filter);
-	      }
-	      if (isPersistent) {
-	    	gateway.setPersistenceEnabled(true);
-	        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-	        	.getName());
-	      } else {
-	    	DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-	    	gateway.setDiskStoreName(store.getName());
-	      }
-	      gateway.setBatchConflationEnabled(isConflation);
-	      gateway.setDispatcherThreads(numDispatchers);
-	      gateway.setOrderPolicy(orderPolicy);
-	      gateway.create(dsName, remoteDsId);
-
-		} else {
-		  GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
-		  gateway.setMaximumQueueMemory(maxMemory);
-		  gateway.setBatchSize(batchSize);
-		  gateway.setManualStart(isManulaStart);
-		  ((InternalGatewaySenderFactory) gateway)
-		  .setLocatorDiscoveryCallback(new MyLocatorCallback());
-		  if (filter != null) {
-			eventFilter = filter;
-			gateway.addGatewayEventFilter(filter);
-		  }
-		  gateway.setBatchConflationEnabled(isConflation);
-		  if (isPersistent) {
-			gateway.setPersistenceEnabled(true);
-			gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-				.getName());
-		  } else {
-			DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-			gateway.setDiskStoreName(store.getName());
-		  }
-		  gateway.setDispatcherThreads(numDispatchers);
-		  gateway.setOrderPolicy(orderPolicy);
-		  gateway.create(dsName, remoteDsId);
-		}
-	  } finally {
-		exln.remove();
-	  }
+    try {
+      File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      persistentDirectory.mkdir();
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      File[] dirs1 = new File[] { persistentDirectory };
+      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
+          isManualStart, numDispatchers, orderPolicy);
+      gateway.create(dsName, remoteDsId);
+
+    } finally {
+      exln.remove();
+    }
   }
   
   public static void createSenderWithoutDiskStore(String dsName, int remoteDsId,
@@ -2417,53 +2370,8 @@ public class WANTestBase extends DistributedTestCase{
     persistentDirectory.mkdir();
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File[] dirs1 = new File[] { persistentDirectory };
-
-    if (isParallel) {
-      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
-      gateway.setParallel(true);
-      gateway.setMaximumQueueMemory(maxMemory);
-      gateway.setBatchSize(batchSize);
-      gateway.setManualStart(isManualStart);
-      ((InternalGatewaySenderFactory) gateway)
-          .setLocatorDiscoveryCallback(new MyLocatorCallback());
-      if (filter != null) {
-        gateway.addGatewayEventFilter(filter);
-      }
-      if (isPersistent) {
-        gateway.setPersistenceEnabled(true);
-        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-            .getName());
-      } else {
-        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-        gateway.setDiskStoreName(store.getName());
-      }
-      gateway.setBatchConflationEnabled(isConflation);
-      gateway.setDispatcherThreads(concurrencyLevel);
-      gateway.setOrderPolicy(policy);
-      gateway.create(dsName, remoteDsId);
-    } else {
-      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
-      gateway.setMaximumQueueMemory(maxMemory);
-      gateway.setBatchSize(batchSize);
-      gateway.setManualStart(isManualStart);
-      ((InternalGatewaySenderFactory) gateway)
-          .setLocatorDiscoveryCallback(new MyLocatorCallback());
-      if (filter != null) {
-        gateway.addGatewayEventFilter(filter);
-      }
-      gateway.setBatchConflationEnabled(isConflation);
-      if (isPersistent) {
-        gateway.setPersistenceEnabled(true);
-        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
-            .getName());
-      } else {
-        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-        gateway.setDiskStoreName(store.getName());
-      }
-      gateway.setDispatcherThreads(concurrencyLevel);
-      gateway.setOrderPolicy(policy);
-      gateway.create(dsName, remoteDsId);
-    }
+    GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+    gateway.create(dsName, remoteDsId);
   }
   
 //  public static void createSender_PDX(String dsName, int remoteDsId,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java
index e24f593..172a4f2 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java
@@ -28,5 +28,5 @@ public class ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest extends
   public boolean isOffHeap() {
     return true;
   }
-
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3b7e3ff3/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index ea6de11..00778e9 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -29,6 +29,7 @@ import com.gemstone.gemfire.distributed.Locator;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.InternalLocator;
 import com.gemstone.gemfire.distributed.internal.ServerLocator;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
@@ -420,7 +421,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 ));
 
     validateQueueContents(vm5, "ln", 100);
-    validateQueueClosedVM4();
     vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
     vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
 
@@ -442,11 +442,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
         getTestMethodName() + "_RR", 110 ));
     vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
   }
-  
-  private void validateQueueClosedVM4() {
-    // TODO Auto-generated method stub
-    
-  }
 
   private void validateQueueContents(VM vm, String site, int size) {
     vm.invoke(() -> WANTestBase.validateQueueContents( site,