You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/10/30 06:25:32 UTC

[geode] branch feature/GEODE-7258-2 created (now 50d11e9)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-7258-2
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 50d11e9  fix rebase conflict

This branch includes the following new commits:

     new 98209d6  GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed.
     new 50d11e9  fix rebase conflict

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 02/02: fix rebase conflict

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7258-2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 50d11e91b7888197b491e13f1b7e0a38302cabea
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Oct 29 23:24:09 2019 -0700

    fix rebase conflict
---
 .../geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java       | 4 ----
 .../geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java    | 2 +-
 2 files changed, 1 insertion(+), 5 deletions(-)

diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index 5cdeadc..98d39d6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -27,10 +27,6 @@ import org.junit.runners.Parameterized;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
index a996142..6ee300b 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
@@ -48,8 +48,8 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;


[geode] 01/02: GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed.

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7258-2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 98209d610917892758661f8ff07456383ab04ac3
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Oct 18 10:27:46 2019 -0700

    GEODE-7258: The function retry logic is modified to handle exception
    thrown, while trying to connect to a server thats shutdown/closed.
    
        Co-authored-by: Anil <ag...@pivotal.io>
        Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../client/internal/ClientMetadataService.java     |   5 +
 .../cache/client/internal/ExecuteFunctionOp.java   |  12 +-
 .../client/internal/ExecuteRegionFunctionOp.java   |  17 +-
 .../internal/ExecuteRegionFunctionSingleHopOp.java |   3 +-
 .../geode/cache/client/internal/PoolImpl.java      |  25 +
 .../cache/client/internal/ServerRegionProxy.java   |   8 +-
 .../client/internal/SingleHopClientExecutor.java   |  19 +-
 .../internal/pooling/ConnectionManagerImpl.java    |   6 +-
 .../internal/ExecuteFunctionOpRetryTest.java       |   5 +-
 .../internal/ExecuteFunctionTestSupport.java       |   8 +-
 .../internal/ExecuteRegionFunctionOpRetryTest.java |   5 +-
 .../ExecuteRegionFunctionSingleHopOpRetryTest.java |   9 +-
 .../geode/cache/client/internal/PoolImplTest.java  | 163 +++++
 .../LuceneSearchWithRollingUpgradeDUnit.java       | 809 +--------------------
 ...=> LuceneSearchWithRollingUpgradeTestBase.java} | 278 +++----
 ...ultAfterTwoLocatorsWithTwoServersAreRolled.java |   4 +-
 ...tAndServersAreRestartedFromCurrentVersion.java} |  68 +-
 ...tResultsAfterClientAndServersAreRolledOver.java |   9 +-
 ...ntAndServersAreRolledOverAllBucketsCreated.java |   7 +-
 19 files changed, 383 insertions(+), 1077 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index 35837ac..77cc175 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.FixedPartitionResolver;
@@ -493,6 +494,10 @@ public class ClientMetadataService {
     return bucketId;
   }
 
+  @VisibleForTesting
+  public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive) {
+    scheduleGetPRMetaData((InternalRegion) region, isRecursive);
+  }
 
   public void scheduleGetPRMetaData(final InternalRegion region, final boolean isRecursive) {
     if (this.nonPRs.contains(region.getFullPath())) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
index b920343..e16d6cd 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.execute.Function;
@@ -60,6 +59,8 @@ public class ExecuteFunctionOp {
   /** index of ignoreFailedMembers in flags[] */
   public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   private ExecuteFunctionOp() {
     // no instances allowed
   }
@@ -83,8 +84,8 @@ public class ExecuteFunctionOp {
     } else {
 
       boolean reexecute = false;
+      int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
 
-      int maxRetryAttempts = pool.getRetryAttempts();
       if (!isHA) {
         maxRetryAttempts = 0;
       }
@@ -107,11 +108,8 @@ public class ExecuteFunctionOp {
 
         } catch (ServerConnectivityException se) {
 
-          if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-            // If the retryAttempt is set to default(-1). Try it on all servers once.
-            // Calculating number of servers when function is re-executed as it involves
-            // messaging locator.
-            maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+          if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+            maxRetryAttempts = pool.calculateRetryAttempts(se);
           }
 
           if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index f4d7520..c40df1c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.client.NoAvailableServersException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
@@ -60,6 +59,8 @@ public class ExecuteRegionFunctionOp {
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   private ExecuteRegionFunctionOp() {
     // no instances allowed
   }
@@ -67,17 +68,14 @@ public class ExecuteRegionFunctionOp {
   /**
    * Does a execute Function on a server using connections from the given pool to communicate with
    * the server.
-   *
-   * @param pool the pool to use to communicate with the server.
-   * @param resultCollector is used to collect the results from the Server
-   * @param maxRetryAttempts Maximum number of retry attempts
    */
   static void execute(ExecutablePool pool,
       ResultCollector resultCollector,
-      int maxRetryAttempts, boolean isHA,
+      int retryAttempts, boolean isHA,
       ExecuteRegionFunctionOpImpl op, boolean isReexecute,
       Set<String> failedNodes) {
 
+    int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : MAX_RETRY_INITIAL_VALUE;
     if (!isHA) {
       maxRetryAttempts = 0;
     }
@@ -107,11 +105,8 @@ public class ExecuteRegionFunctionOp {
         throw failedException;
       } catch (ServerConnectivityException se) {
 
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
+        if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+          maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se);
         }
 
         if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index f41a829..2d5abf1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -66,7 +66,6 @@ public class ExecuteRegionFunctionSingleHopOp {
       ServerRegionFunctionExecutor serverRegionExecutor,
       ResultCollector resultCollector,
       Map<ServerLocation, ? extends HashSet> serverToFilterMap,
-      int mRetryAttempts,
       boolean isHA,
       final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction,
       final Supplier<AbstractOp> executeRegionFunctionOpSupplier) {
@@ -87,7 +86,7 @@ public class ExecuteRegionFunctionSingleHopOp {
 
     final int retryAttempts =
         SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA,
-            resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool));
+            resultCollector, failedNodes, ((PoolImpl) pool));
 
     if (isDebugEnabled) {
       logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index fbb7d8a..f77373b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.NoSubscriptionServersAvailableException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionService;
 import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.SubscriptionNotEnabledException;
 import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
@@ -1581,4 +1582,28 @@ public class PoolImpl implements InternalPool {
   public int getSubscriptionTimeoutMultiplier() {
     return subscriptionTimeoutMultiplier;
   }
+
+  public int calculateRetryAttempts(Throwable cause) {
+
+    int maxRetryAttempts = getRetryAttempts();
+
+    if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
+      // If the retryAttempt is set to default(-1). Try executing on all servers once.
+      // As calculating number of servers involves sending message to locator, it is
+      // done only when there is an exception.
+      if (cause instanceof ServerConnectivityException
+          && (cause.getMessage().contains(ConnectionManagerImpl.BORROW_CONN_ERROR_MSG)
+              || cause.getMessage().contains(ConnectionManagerImpl.UNEXPECTED_SOCKET_CLOSED_MSG))) {
+        // The client was unable to establish a connection before sending the
+        // request.
+        maxRetryAttempts = getConnectionSource().getAllServers().size();
+      } else {
+        // The request was sent once.
+        maxRetryAttempts = getConnectionSource().getAllServers().size() - 1;
+      }
+    }
+
+    return maxRetryAttempts;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index dd658ea..4ea852c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -699,7 +699,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     hasResult, emptySet(), true, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor,
-                resultCollector, serverToBuckets, retryAttempts, function.isHA(),
+                resultCollector, serverToBuckets, function.isHA(),
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         } else {
@@ -725,7 +725,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     hasResult, emptySet(), isBucketFilter, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 function.isHA(), regionFunctionSingleHopOpFunction,
                 executeRegionFunctionOpSupplier);
           }
@@ -786,7 +786,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     emptySet(), true, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA,
+                serverRegionExecutor, resultCollector, serverToBuckets, isHA,
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
 
@@ -810,7 +810,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
index e5050eb..3799d5d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
@@ -30,7 +30,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl;
@@ -50,6 +49,8 @@ public class SingleHopClientExecutor {
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   @MakeNotStatic
   static final ExecutorService execService =
       LoggingExecutors.newCachedThreadPool("Function Execution Thread-", true);
@@ -89,11 +90,10 @@ public class SingleHopClientExecutor {
 
   static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA,
       ResultCollector rc, Set<String> failedNodes,
-      final int retryAttemptsArg,
       final PoolImpl pool) {
 
-    ClientMetadataService cms = region.getCache().getClientMetadataService();
-    int maxRetryAttempts = 0;
+    ClientMetadataService cms;
+    int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
 
     if (callableTasks != null && !callableTasks.isEmpty()) {
       List futures = null;
@@ -120,15 +120,8 @@ public class SingleHopClientExecutor {
             throw new InternalGemFireException(e.getMessage());
           } catch (ExecutionException ee) {
 
-            if (maxRetryAttempts == 0) {
-              maxRetryAttempts = retryAttemptsArg;
-            }
-
-            if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-              // If the retryAttempt is set to default(-1). Try it on all servers once.
-              // Calculating number of servers when function is re-executed as it involves
-              // messaging locator.
-              maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+            if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+              maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause());
             }
 
             if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index 438980c..cc07570 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -70,6 +70,9 @@ import org.apache.geode.security.GemFireSecurityException;
 public class ConnectionManagerImpl implements ConnectionManager {
   private static final Logger logger = LogService.getLogger();
   private static final int NOT_WAITING = -1;
+  public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server ";
+  public static final String UNEXPECTED_SOCKET_CLOSED_MSG =
+      "Pool unexpected closed socket on server";
 
   private final String poolName;
   private final PoolStats poolStats;
@@ -321,8 +324,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
       return connection;
     }
 
-    throw new ServerConnectivityException(
-        "Could not create a new connection to server " + server);
+    throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
index ae9ae13..bb11a04 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
@@ -228,9 +228,8 @@ public class ExecuteFunctionOpRetryTest {
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
-
-    when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts);
+            failureMode),
+        retryAttempts);
 
     args = null;
     memberMappedArg = mock(MemberMappedArgument.class);
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
index c91816f..1d0cad5 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
@@ -15,6 +15,7 @@
 package org.apache.geode.cache.client.internal;
 
 import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -91,8 +92,6 @@ class ExecuteFunctionTestSupport {
    * This method has to be {@code static} because it is called before
    * {@link ExecuteFunctionTestSupport} is constructed.
    *
-   * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()}
-   *        methods on {@link PoolImpl}
    * @param failureMode is the {@link FailureMode} that determines the kind of exception
    *        to {@code throw}
    */
@@ -149,7 +148,7 @@ class ExecuteFunctionTestSupport {
   ExecuteFunctionTestSupport(
       final HAStatus haStatus,
       final FailureMode failureMode,
-      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) {
+      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) {
 
     final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class);
     when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS);
@@ -174,6 +173,9 @@ class ExecuteFunctionTestSupport {
 
     executablePool = mock(PoolImpl.class);
     when(executablePool.getConnectionSource()).thenReturn(connectionSource);
+    when(executablePool.getRetryAttempts()).thenReturn(retryAttempts);
+    when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class)))
+        .thenCallRealMethod();
 
     addPoolMockBehavior.accept(executablePool, failureMode);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
index 74f6748..e92bad1 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
@@ -309,7 +309,7 @@ public class ExecuteRegionFunctionOpRetryTest {
             default:
               throw new AssertionError("unknown FailureMode type: " + failureMode);
           }
-        });
+        }, retryAttempts);
 
     executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -325,7 +325,8 @@ public class ExecuteRegionFunctionOpRetryTest {
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
index aef00fb..5649b1b 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
@@ -137,7 +137,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
   }
 
   private void createMocks(final HAStatus haStatus,
-      final FailureMode failureModeArg) {
+      final FailureMode failureModeArg, Integer retryAttempts) {
 
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(
@@ -146,7 +146,8 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
                 ArgumentMatchers.any(),
                 ArgumentMatchers.anyBoolean(),
                 ArgumentMatchers.anyBoolean())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     serverToFilterMap = new HashMap<>();
     serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>());
@@ -158,7 +159,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
       final int retryAttempts, final int expectTries,
       final FailureMode failureMode) {
 
-    createMocks(haStatus, failureMode);
+    createMocks(haStatus, failureMode, retryAttempts);
 
     executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -182,7 +183,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
             () -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute(
                 executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 testSupport.toBoolean(haStatus),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), FUNCTION_NAME,
@@ -199,7 +199,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
         ignoreServerConnectivityException(
             () -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 function.isHA(),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), function,
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
new file mode 100644
index 0000000..92ab66f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.internal.cache.PoolManagerImpl;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SSLConfigurationFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class PoolImplTest {
+
+  @Test
+  public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureWithUnexpectedSocketClose() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage())
+        .thenReturn(ConnectionManagerImpl.UNEXPECTED_SOCKET_CLOSED_MSG);
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+  }
+
+  @Test
+  public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureDuringBorrowConnection() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage())
+        .thenReturn(ConnectionManagerImpl.BORROW_CONN_ERROR_MSG);
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+  }
+
+  @Test
+  public void calculateRetryAttemptsDecrementsRetryCountForFailureAfterSendingTheRequest() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception");
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(0);
+  }
+
+  @Test
+  public void calculateRetryAttemptsReturnsTheRetyCountConfiguredWithPool() {
+    int retryCount = 1;
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception");
+
+    PoolImpl poolImpl = spy(getPool(retryCount));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(retryCount);
+  }
+
+  private PoolImpl getPool(int retryAttemptsAttribute) {
+    final DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
+        .getSecurableCommunicationChannels();
+
+    SSLConfigurationFactory.setDistributionConfig(distributionConfig);
+
+    final Properties properties = new Properties();
+    properties.put(DURABLE_CLIENT_ID, "1");
+
+    final Statistics statistics = mock(Statistics.class);
+
+    final PoolFactoryImpl.PoolAttributes poolAttributes =
+        mock(PoolFactoryImpl.PoolAttributes.class);
+
+    /*
+     * These are the minimum pool attributes required
+     * so that basic validation and setup completes successfully. The values of
+     * these attributes have no importance to the assertions of the test itself.
+     */
+    doReturn(1).when(poolAttributes).getMaxConnections();
+    doReturn((long) 10e8).when(poolAttributes).getPingInterval();
+    doReturn(retryAttemptsAttribute).when(poolAttributes).getRetryAttempts();
+
+    final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    final InternalCache internalCache = mock(InternalCache.class);
+    doReturn(cancelCriterion).when(internalCache).getCancelCriterion();
+
+    final InternalDistributedSystem internalDistributedSystem =
+        mock(InternalDistributedSystem.class);
+    doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
+    doReturn(properties).when(internalDistributedSystem).getProperties();
+    doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());
+
+    final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
+    doReturn(true).when(poolManager).isNormal();
+
+    final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);
+
+    return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
+        internalDistributedSystem, internalCache, tMonitoring);
+  }
+
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index ac315bc..5cdeadc 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -14,67 +14,43 @@
  */
 package org.apache.geode.cache.lucene;
 
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.Assert.fail;
-import static org.junit.Assert.assertEquals;
 
 import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.logging.log4j.Logger;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
-import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.internal.DUnitLauncher;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 import org.apache.geode.test.version.TestVersion;
 import org.apache.geode.test.version.VersionManager;
 
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase {
+public abstract class LuceneSearchWithRollingUpgradeDUnit
+    extends LuceneSearchWithRollingUpgradeTestBase {
 
 
-  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}")
+  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}")
   public static Collection<Object[]> data() {
     Collection<String> luceneVersions = getLuceneVersions();
     Collection<Object[]> rval = new ArrayList<>();
     luceneVersions.forEach(v -> {
-      rval.add(new Object[] {v, true});
-      rval.add(new Object[] {v, false});
+      rval.add(new Object[] {v, true, true});
+      rval.add(new Object[] {v, false, true});
     });
     return rval;
   }
@@ -84,6 +60,10 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     // Lucene Compatibility checks start with Apache Geode v1.2.0
     // Removing the versions older than v1.2.0
     result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0);
+
+    // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling
+    // upgrade for 1.10.0. The change was verified by rolling from develop to develop.
+    result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0);
     if (result.size() < 1) {
       throw new RuntimeException("No older versions of Geode were found to test against");
     } else {
@@ -92,15 +72,6 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return result;
   }
 
-  private File[] testingDirs = new File[3];
-
-  protected static String INDEX_NAME = "index";
-
-  private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit";
-
-  // Each vm will have a cache object
-  protected static Object cache;
-
   // the old version of Geode we're testing against
   @Parameterized.Parameter()
   public String oldVersion;
@@ -108,137 +79,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
   @Parameterized.Parameter(1)
   public Boolean reindex;
 
-  private void deleteVMFiles() {
-    System.out.println("deleting files in vm" + VM.getCurrentVMNum());
-    File pwd = new File(".");
-    for (File entry : pwd.listFiles()) {
-      try {
-        if (entry.isDirectory()) {
-          FileUtils.deleteDirectory(entry);
-        } else {
-          entry.delete();
-        }
-      } catch (Exception e) {
-        System.out.println("Could not delete " + entry + ": " + e.getMessage());
-      }
-    }
-  }
-
-  private void deleteWorkingDirFiles() {
-    Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
-  }
-
-  @Override
-  public void postSetUp() {
-    deleteWorkingDirFiles();
-    IgnoredException.addIgnoredException(
-        "cluster configuration service not available|ConflictingPersistentDataException");
-  }
-
-
-  Properties getLocatorPropertiesPre91(String locatorsString) {
-    Properties props = new Properties();
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
-    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
-    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
-    return props;
-  }
-
-  VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut,
-      String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) {
-    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled);
-    // recreate region on "rolled" client
-    invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
-    return rollClient;
-  }
-
-  private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts,
-      boolean subscriptionEnabled) {
-    oldClient.invoke(invokeCloseCache());
-    VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
-    rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
-        subscriptionEnabled));
-    rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
-    return rollClient;
-  }
-
-  CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
-      final ClientRegionShortcut shortcut) {
-    return new CacheSerializableRunnable("execute: createClientRegion") {
-      @Override
-      public void run2() {
-        try {
-          createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
-              shortcut);
-        } catch (Exception e) {
-          fail("Error creating client region", e);
-        }
-      }
-    };
-  }
-
-  private static void createClientRegion(GemFireCache cache, String regionName,
-      ClientRegionShortcut shortcut) {
-    ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
-    rf.create(regionName);
-  }
-
-  CacheSerializableRunnable invokeStartCacheServer(final int port) {
-    return new CacheSerializableRunnable("execute: startCacheServer") {
-      @Override
-      public void run2() {
-        try {
-          startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port);
-        } catch (Exception e) {
-          fail("Error creating cache", e);
-        }
-      }
-    };
-  }
-
-  private static void startCacheServer(GemFireCache cache, int port) throws Exception {
-    CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
-    cacheServer.setPort(port);
-    cacheServer.start();
-  }
-
-  CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties,
-      final String[] hosts, final int[] ports, boolean subscriptionEnabled) {
-    return new CacheSerializableRunnable("execute: createClientCache") {
-      @Override
-      public void run2() {
-        try {
-          LuceneSearchWithRollingUpgradeDUnit.cache =
-              createClientCache(systemProperties, hosts, ports, subscriptionEnabled);
-        } catch (Exception e) {
-          fail("Error creating client cache", e);
-        }
-      }
-    };
-  }
-
-  Properties getClientSystemProperties() {
-    Properties p = new Properties();
-    p.setProperty("mcast-port", "0");
-    return p;
-  }
-
-
-  private static ClientCache createClientCache(Properties systemProperties, String[] hosts,
-      int[] ports, boolean subscriptionEnabled) {
-    ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
-    if (subscriptionEnabled) {
-      cf.setPoolSubscriptionEnabled(true);
-      cf.setPoolSubscriptionRedundancy(-1);
-    }
-    int hostsLength = hosts.length;
-    for (int i = 0; i < hostsLength; i++) {
-      cf.addPoolLocator(hosts[i], ports[i]);
-    }
-
-    return cf.create();
-  }
+  @Parameterized.Parameter(2)
+  public Boolean singleHopEnabled;
 
   // We start an "old" locator and old servers
   // We roll the locator
@@ -303,7 +145,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
           locatorString);
 
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType,
-          testingDirs[0], shortcutName, regionName, locatorPorts);
+          testingDirs[0], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5,
@@ -313,7 +155,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
           20, server1, server3);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType,
-          testingDirs[1], shortcutName, regionName, locatorPorts);
+          testingDirs[1], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
@@ -323,7 +165,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
           30, server2, server3);
 
       server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType,
-          testingDirs[2], shortcutName, regionName, locatorPorts);
+          testingDirs[2], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3);
       putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15,
           25, server1, server2);
@@ -340,627 +182,4 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
-      int expectedRegionSize, int start, int end, VM... vms) throws Exception {
-    // do puts
-    putSerializableObject(putter, regionName, start, end);
-
-    // verify present in others
-    verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
-  }
-
-  void putSerializableObject(VM putter, String regionName, int start, int end)
-      throws Exception {
-    for (int i = start; i < end; i++) {
-      Class aClass = Thread.currentThread().getContextClassLoader()
-          .loadClass("org.apache.geode.cache.query.data.Portfolio");
-      Constructor portfolioConstructor = aClass.getConstructor(int.class);
-      Object serializableObject = portfolioConstructor.newInstance(i);
-      putter.invoke(invokePut(regionName, i, serializableObject));
-    }
-  }
-
-  private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
-    await().untilAsserted(() -> {
-      Object region =
-          cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
-      int regionSize = (int) region.getClass().getMethod("size").invoke(region);
-      assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
-          regionSize);
-    });
-  }
-
-  void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
-      throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    luceneService.getClass()
-        .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
-        .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
-    Method createLuceneQueryFactoryMethod =
-        luceneService.getClass().getMethod("createLuceneQueryFactory");
-    createLuceneQueryFactoryMethod.setAccessible(true);
-    Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
-    Object luceneQuery = luceneQueryFactory.getClass()
-        .getMethod("create", String.class, String.class, String.class, String.class)
-        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
-
-    Collection resultsActive = executeLuceneQuery(luceneQuery);
-
-    luceneQuery = luceneQueryFactory.getClass()
-        .getMethod("create", String.class, String.class, String.class, String.class)
-        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
-
-    Collection resultsInactive = executeLuceneQuery(luceneQuery);
-
-    assertEquals("Result size not as expected ", expectedRegionSize,
-        resultsActive.size() + resultsInactive.size());
-  }
-
-  private Collection executeLuceneQuery(Object luceneQuery)
-      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-    Collection results = null;
-    int retryCount = 10;
-    while (true) {
-      try {
-        results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
-        break;
-      } catch (Exception ex) {
-        if (!ex.getCause().getMessage().contains("currently indexing")) {
-          throw ex;
-        }
-        if (--retryCount == 0) {
-          throw ex;
-        }
-      }
-    }
-    return results;
-
-  }
-
-  private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
-      VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
-      vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
-    }
-
-  }
-
-  void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(runnable);
-    }
-  }
-
-  // Used to close cache and make sure we attempt on all vms even if some do not have a cache
-  void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
-      VM... vms) {
-    for (VM vm : vms) {
-      try {
-        vm.invoke(runnable);
-      } catch (Exception e) {
-        if (!catchErrors) {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
-    // Roll the server
-    oldServer.invoke(invokeCloseCache());
-    VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
-    rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
-        : getSystemPropertiesPost71(locatorPorts)));
-    rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
-    return rollServer;
-  }
-
-  VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType,
-      File diskdir, String shortcutName, String regionName, int[] locatorPorts) {
-    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
-    return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
-        rollServer);
-  }
-
-  private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir,
-      String shortcutName, String regionName, VM rollServer) {
-
-    Boolean serializeIt = reindex;
-    rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
-    rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
-    // recreate region on "rolled" server
-    if ((regionType.equals("persistentPartitioned"))) {
-      CacheSerializableRunnable runnable =
-          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
-      invokeRunnableInVMs(runnable, rollServer);
-    } else {
-      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
-    }
-    rollServer.invoke(invokeRebalance());
-    return rollServer;
-  }
-
-  VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
-      String shortcutName, String regionName, int[] locatorPorts) {
-    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
-    // recreate region on "rolled" server
-    if ((regionType.equals("persistentPartitioned"))) {
-      CacheSerializableRunnable runnable =
-          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
-      invokeRunnableInVMs(runnable, rollServer);
-    } else {
-      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
-    }
-    rollServer.invoke(invokeRebalance());
-    return rollServer;
-  }
-
-  VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
-      final String testName, final String locatorString) {
-    // Roll the locator
-    oldLocator.invoke(invokeStopLocator());
-    VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
-    final Properties props = new Properties();
-    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-    rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
-    return rollLocator;
-  }
-
-  // Due to licensing changes
-  private Properties getSystemPropertiesPost71() {
-    Properties props = getSystemProperties();
-    return props;
-  }
-
-  // Due to licensing changes
-  private Properties getSystemPropertiesPost71(int[] locatorPorts) {
-    Properties props = getSystemProperties(locatorPorts);
-    return props;
-  }
-
-  private Properties getSystemProperties() {
-    Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
-    props.remove("disable-auto-reconnect");
-    props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
-    props.remove(DistributionConfig.LOCK_MEMORY_NAME);
-    return props;
-  }
-
-  Properties getSystemProperties(int[] locatorPorts) {
-    Properties p = new Properties();
-    String locatorString = getLocatorString(locatorPorts);
-    p.setProperty("locators", locatorString);
-    p.setProperty("mcast-port", "0");
-    return p;
-  }
-
-  static String getLocatorString(int locatorPort) {
-    String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
-    return locatorString;
-  }
-
-  static String getLocatorString(int[] locatorPorts) {
-    StringBuilder locatorString = new StringBuilder();
-    int numLocators = locatorPorts.length;
-    for (int i = 0; i < numLocators; i++) {
-      locatorString.append(getLocatorString(locatorPorts[i]));
-      if (i + 1 < numLocators) {
-        locatorString.append(",");
-      }
-    }
-    return locatorString.toString();
-  }
-
-  private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
-      final String testName, final String locatorsString, final Properties props) {
-    return new CacheSerializableRunnable("execute: startLocator") {
-      @Override
-      public void run2() {
-        try {
-          startLocator(serverHostName, port, testName, locatorsString, props);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
-      final Properties props) {
-    return new CacheSerializableRunnable("execute: startLocator") {
-      @Override
-      public void run2() {
-        try {
-          startLocator(serverHostName, port, props);
-        } catch (Exception e) {
-          fail("Error starting locators", e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
-    return new CacheSerializableRunnable("execute: createCache") {
-      @Override
-      public void run2() {
-        try {
-          LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeAssertVersion(final short version) {
-    return new CacheSerializableRunnable("execute: assertVersion") {
-      @Override
-      public void run2() {
-        try {
-          assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCreateRegion(final String regionName,
-      final String shortcutName) {
-    return new CacheSerializableRunnable("execute: createRegion") {
-      @Override
-      public void run2() {
-        try {
-          createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName,
-      final File diskstore) {
-    return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
-      @Override
-      public void run2() {
-        try {
-          createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
-              diskstore);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokePut(final String regionName, final Object key,
-      final Object value) {
-    return new CacheSerializableRunnable("execute: put") {
-      @Override
-      public void run2() {
-        try {
-          put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeStopLocator() {
-    return new CacheSerializableRunnable("execute: stopLocator") {
-      @Override
-      public void run2() {
-        try {
-          stopLocator();
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCloseCache() {
-    return new CacheSerializableRunnable("execute: closeCache") {
-      @Override
-      public void run2() {
-        try {
-          closeCache(LuceneSearchWithRollingUpgradeDUnit.cache);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeRebalance() {
-    return new CacheSerializableRunnable("execute: rebalance") {
-      @Override
-      public void run2() {
-        try {
-          rebalance(LuceneSearchWithRollingUpgradeDUnit.cache);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private void deleteDiskStores() {
-    try {
-      FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
-    } catch (IOException e) {
-      throw new Error("Error deleting files", e);
-    }
-  }
-
-  private static Object createCache(Properties systemProperties) throws Exception {
-
-    Class distConfigClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
-    boolean disableConfig = true;
-    try {
-      distConfigClass.getDeclaredField("useSharedConfiguration");
-    } catch (NoSuchFieldException e) {
-      disableConfig = false;
-    }
-    if (disableConfig) {
-      systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
-    }
-
-    Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.CacheFactory");
-    Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
-    constructor.setAccessible(true);
-    Object cacheFactory = constructor.newInstance(systemProperties);
-
-    Method createMethod = cacheFactoryClass.getMethod("create");
-    createMethod.setAccessible(true);
-    Object cache = createMethod.invoke(cacheFactory);
-    return cache;
-  }
-
-  private static Object getRegion(Object cache, String regionName) throws Exception {
-    return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
-  }
-
-  private static Object put(Object cache, String regionName, Object key, Object value)
-      throws Exception {
-    Object region = getRegion(cache, regionName);
-    return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
-        value);
-  }
-
-  private static void createRegion(Object cache, String regionName, String shortcutName)
-      throws Exception {
-    Class aClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.RegionShortcut");
-    Object[] enumConstants = aClass.getEnumConstants();
-    Object shortcut = null;
-    int length = enumConstants.length;
-    for (int i = 0; i < length; i++) {
-      Object constant = enumConstants[i];
-      if (((Enum) constant).name().equals(shortcutName)) {
-        shortcut = constant;
-        break;
-      }
-    }
-
-    Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
-    createRegionFactoryMethod.setAccessible(true);
-    Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
-    Method createMethod = regionFactory.getClass().getMethod("create", String.class);
-    createMethod.setAccessible(true);
-    createMethod.invoke(regionFactory, regionName);
-  }
-
-  static void createLuceneIndex(Object cache, String regionName, String indexName)
-      throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    Method createLuceneIndexFactoryMethod =
-        luceneService.getClass().getMethod("createIndexFactory");
-    createLuceneIndexFactoryMethod.setAccessible(true);
-    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
-    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
-        "status");
-    luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
-        .invoke(luceneIndexFactory, indexName, regionName);
-  }
-
-  static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
-      String indexName) throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    Method createLuceneIndexFactoryMethod =
-        luceneService.getClass().getMethod("createIndexFactory");
-    createLuceneIndexFactoryMethod.setAccessible(true);
-    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
-    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
-        "status");
-    luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
-        .invoke(luceneIndexFactory, indexName, regionName, true);
-  }
-
-  private static void createPersistentPartitonedRegion(Object cache, String regionName,
-      File diskStore) throws Exception {
-    Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
-    Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.DataPolicy");
-    Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
-    if (store == null) {
-      Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
-      dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
-      dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
-          new Object[] {new File[] {diskStore.getAbsoluteFile()}});
-      dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
-    }
-    Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
-    rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
-    rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
-    rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
-  }
-
-  private static void assertVersion(Object cache, short ordinal) throws Exception {
-    Class idmClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
-    Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
-    getDSMethod.setAccessible(true);
-    Object ds = getDSMethod.invoke(cache);
-
-    Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
-    getDistributedMemberMethod.setAccessible(true);
-    Object member = getDistributedMemberMethod.invoke(ds);
-    Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
-    getVersionObjectMethod.setAccessible(true);
-    Object thisVersion = getVersionObjectMethod.invoke(member);
-    Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
-    getOrdinalMethod.setAccessible(true);
-    short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
-    if (ordinal != thisOrdinal) {
-      throw new Error(
-          "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
-    }
-  }
-
-  private static void stopCacheServers(Object cache) throws Exception {
-    Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
-    getCacheServersMethod.setAccessible(true);
-    List cacheServers = (List) getCacheServersMethod.invoke(cache);
-    Method stopMethod = null;
-    for (Object cs : cacheServers) {
-      if (stopMethod == null) {
-        stopMethod = cs.getClass().getMethod("stop");
-      }
-      stopMethod.setAccessible(true);
-      stopMethod.invoke(cs);
-    }
-  }
-
-  private static void closeCache(Object cache) throws Exception {
-    if (cache == null) {
-      return;
-    }
-    Method isClosedMethod = cache.getClass().getMethod("isClosed");
-    isClosedMethod.setAccessible(true);
-    boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
-    if (cache != null && !cacheClosed) {
-      stopCacheServers(cache);
-      Method method = cache.getClass().getMethod("close");
-      method.setAccessible(true);
-      method.invoke(cache);
-      long startTime = System.currentTimeMillis();
-      while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
-        try {
-          Thread.sleep(1000);
-          Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
-          cacheClosedMethod.setAccessible(true);
-          cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-  }
-
-  private static void rebalance(Object cache) throws Exception {
-    Method getRMMethod = cache.getClass().getMethod("getResourceManager");
-    getRMMethod.setAccessible(true);
-    Object manager = getRMMethod.invoke(cache);
-
-    Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
-    createRebalanceFactoryMethod.setAccessible(true);
-    Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
-    Method m = rebalanceFactory.getClass().getMethod("start");
-    m.setAccessible(true);
-    Object op = m.invoke(rebalanceFactory);
-
-    // Wait until the rebalance is complete
-    try {
-      Method getResultsMethod = op.getClass().getMethod("getResults");
-      getResultsMethod.setAccessible(true);
-      Object results = getResultsMethod.invoke(op);
-      Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
-      getTotalTimeMethod.setAccessible(true);
-      System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
-      Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
-      getTotalBucketsMethod.setAccessible(true);
-      System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
-    } catch (Exception e) {
-      Thread.currentThread().interrupt();
-      throw e;
-    }
-  }
-
-  /**
-   * Starts a locator with given configuration.
-   */
-  private static void startLocator(final String serverHostName, final int port,
-      final String testName, final String locatorsString, final Properties props) throws Exception {
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
-    Logger logger = LogService.getLogger();
-    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
-
-    InetAddress bindAddr;
-    try {
-      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
-    } catch (UnknownHostException uhe) {
-      throw new Error("While resolving bind address ", uhe);
-    }
-
-    File logFile = new File(testName + "-locator" + port + ".log");
-    Class locatorClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.Locator");
-    Method startLocatorAndDSMethod =
-        locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
-            Properties.class, boolean.class, boolean.class, String.class);
-    startLocatorAndDSMethod.setAccessible(true);
-    startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
-  }
-
-  private static void startLocator(final String serverHostName, final int port, Properties props)
-      throws Exception {
-
-
-    InetAddress bindAddr = null;
-    try {
-      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
-    } catch (UnknownHostException uhe) {
-      throw new Error("While resolving bind address ", uhe);
-    }
-
-    Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
-    Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
-  }
-
-  private static void stopLocator() throws Exception {
-    Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.InternalLocator");
-    Method locatorMethod = internalLocatorClass.getMethod("getLocator");
-    locatorMethod.setAccessible(true);
-    Object locator = locatorMethod.invoke(null);
-    Method stopLocatorMethod = locator.getClass().getMethod("stop");
-    stopLocatorMethod.setAccessible(true);
-    stopLocatorMethod.invoke(locator);
-  }
-
-  /**
-   * Get the port that the standard dunit locator is listening on.
-   *
-   * @return locator address
-   */
-  private static String getDUnitLocatorAddress() {
-    return Host.getHost(0).getHostName();
-  }
-
 }
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
similarity index 75%
copy from geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
copy to geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
index ac315bc..a996142 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene;
 
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -25,7 +26,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -33,82 +33,45 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.Logger;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.GemFireCache;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.ClientMetadataService;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.Invoke;
-import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.DUnitLauncher;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
-import org.apache.geode.test.version.TestVersion;
 import org.apache.geode.test.version.VersionManager;
 
-@RunWith(Parameterized.class)
-@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase {
+public abstract class LuceneSearchWithRollingUpgradeTestBase extends JUnit4DistributedTestCase {
 
-
-  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}")
-  public static Collection<Object[]> data() {
-    Collection<String> luceneVersions = getLuceneVersions();
-    Collection<Object[]> rval = new ArrayList<>();
-    luceneVersions.forEach(v -> {
-      rval.add(new Object[] {v, true});
-      rval.add(new Object[] {v, false});
-    });
-    return rval;
-  }
-
-  private static Collection<String> getLuceneVersions() {
-    List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent();
-    // Lucene Compatibility checks start with Apache Geode v1.2.0
-    // Removing the versions older than v1.2.0
-    result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0);
-    if (result.size() < 1) {
-      throw new RuntimeException("No older versions of Geode were found to test against");
-    } else {
-      System.out.println("running against these versions: " + result);
-    }
-    return result;
-  }
-
-  private File[] testingDirs = new File[3];
+  protected File[] testingDirs = new File[3];
 
   protected static String INDEX_NAME = "index";
 
-  private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit";
+  protected static String diskDir = "LuceneSearchWithRollingUpgradeTestBase";
 
   // Each vm will have a cache object
   protected static Object cache;
 
-  // the old version of Geode we're testing against
-  @Parameterized.Parameter()
-  public String oldVersion;
-
-  @Parameterized.Parameter(1)
-  public Boolean reindex;
-
-  private void deleteVMFiles() {
+  protected void deleteVMFiles() {
     System.out.println("deleting files in vm" + VM.getCurrentVMNum());
     File pwd = new File(".");
     for (File entry : pwd.listFiles()) {
@@ -124,7 +87,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  private void deleteWorkingDirFiles() {
+  protected void deleteWorkingDirFiles() {
     Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
   }
 
@@ -145,20 +108,24 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return props;
   }
 
-  VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut,
-      String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) {
-    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled);
+  VM rollClientToCurrentAndCreateRegion(VM oldClient,
+      ClientRegionShortcut shortcut,
+      String regionName, String[] hostNames, int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
+    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled,
+        singleHopEnabled);
     // recreate region on "rolled" client
     invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
     return rollClient;
   }
 
-  private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts,
-      boolean subscriptionEnabled) {
+  protected VM rollClientToCurrent(VM oldClient, String[] hostNames,
+      int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
     oldClient.invoke(invokeCloseCache());
     VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
     rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
-        subscriptionEnabled));
+        subscriptionEnabled, singleHopEnabled));
     rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
     return rollClient;
   }
@@ -169,7 +136,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
+          createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache,
+              regionName,
               shortcut);
         } catch (Exception e) {
           fail("Error creating client region", e);
@@ -178,7 +146,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private static void createClientRegion(GemFireCache cache, String regionName,
+  protected static void createClientRegion(GemFireCache cache, String regionName,
       ClientRegionShortcut shortcut) {
     ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
     rf.create(regionName);
@@ -189,7 +157,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port);
+          startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, port);
         } catch (Exception e) {
           fail("Error creating cache", e);
         }
@@ -197,20 +165,23 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private static void startCacheServer(GemFireCache cache, int port) throws Exception {
+  protected static void startCacheServer(GemFireCache cache, int port) throws Exception {
     CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
     cacheServer.setPort(port);
     cacheServer.start();
   }
 
-  CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties,
-      final String[] hosts, final int[] ports, boolean subscriptionEnabled) {
+  CacheSerializableRunnable invokeCreateClientCache(
+      final Properties systemProperties,
+      final String[] hosts, final int[] ports, boolean subscriptionEnabled,
+      boolean singleHopEnabled) {
     return new CacheSerializableRunnable("execute: createClientCache") {
       @Override
       public void run2() {
         try {
-          LuceneSearchWithRollingUpgradeDUnit.cache =
-              createClientCache(systemProperties, hosts, ports, subscriptionEnabled);
+          LuceneSearchWithRollingUpgradeTestBase.cache =
+              createClientCache(systemProperties, hosts, ports, subscriptionEnabled,
+                  singleHopEnabled);
         } catch (Exception e) {
           fail("Error creating client cache", e);
         }
@@ -225,13 +196,15 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
   }
 
 
-  private static ClientCache createClientCache(Properties systemProperties, String[] hosts,
-      int[] ports, boolean subscriptionEnabled) {
+  protected static ClientCache createClientCache(Properties systemProperties,
+      String[] hosts,
+      int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) {
     ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
     if (subscriptionEnabled) {
       cf.setPoolSubscriptionEnabled(true);
       cf.setPoolSubscriptionRedundancy(-1);
     }
+    cf.setPoolPRSingleHopEnabled(singleHopEnabled);
     int hostsLength = hosts.length;
     for (int i = 0; i < hostsLength; i++) {
       cf.addPoolLocator(hosts[i], ports[i]);
@@ -240,105 +213,6 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return cf.create();
   }
 
-  // We start an "old" locator and old servers
-  // We roll the locator
-  // Now we roll all the servers from old to new
-  void executeLuceneQueryWithServerRollOvers(String regionType, String startingVersion)
-      throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(startingVersion, 0);
-    VM server2 = host.getVM(startingVersion, 1);
-    VM server3 = host.getVM(startingVersion, 2);
-    VM locator = host.getVM(startingVersion, 3);
-
-
-    String regionName = "aRegion";
-    String shortcutName = null;
-    if ((regionType.equals("partitionedRedundant"))) {
-      shortcutName = RegionShortcut.PARTITION_REDUNDANT.name();
-    } else if ((regionType.equals("persistentPartitioned"))) {
-      shortcutName = RegionShortcut.PARTITION_PERSISTENT.name();
-      for (int i = 0; i < testingDirs.length; i++) {
-        testingDirs[i] = new File(diskDir, "diskStoreVM_" + String.valueOf(host.getVM(i).getId()))
-            .getAbsoluteFile();
-        if (!testingDirs[i].exists()) {
-          System.out.println(" Creating diskdir for server: " + i);
-          testingDirs[i].mkdirs();
-        }
-      }
-    }
-
-    int[] locatorPorts = AvailablePortHelper.getRandomAvailableTCPPorts(1);
-    String hostName = NetworkUtils.getServerHostName(host);
-    String locatorString = getLocatorString(locatorPorts);
-    final Properties locatorProps = new Properties();
-    // configure all class loaders for each vm
-
-    try {
-      locator.invoke(invokeStartLocator(hostName, locatorPorts[0], getTestMethodName(),
-          locatorString, locatorProps));
-      invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)), server1, server2,
-          server3);
-
-      // Create Lucene Index
-      server1.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
-      server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
-      server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
-
-      // create region
-      if ((regionType.equals("persistentPartitioned"))) {
-        for (int i = 0; i < testingDirs.length; i++) {
-          CacheSerializableRunnable runnable =
-              invokeCreatePersistentPartitionedRegion(regionName, testingDirs[i]);
-          invokeRunnableInVMs(runnable, host.getVM(i));
-        }
-      } else {
-        invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), server1, server2,
-            server3);
-      }
-      int expectedRegionSize = 10;
-      putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 0,
-          10, server2, server3);
-      locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
-          locatorString);
-
-      server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType,
-          testingDirs[0], shortcutName, regionName, locatorPorts);
-      verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1);
-      expectedRegionSize += 5;
-      putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5,
-          15, server2, server3);
-      expectedRegionSize += 5;
-      putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 10,
-          20, server1, server3);
-
-      server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType,
-          testingDirs[1], shortcutName, regionName, locatorPorts);
-      verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2);
-      expectedRegionSize += 5;
-      putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
-          25, server1, server3);
-      expectedRegionSize += 5;
-      putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 20,
-          30, server2, server3);
-
-      server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType,
-          testingDirs[2], shortcutName, regionName, locatorPorts);
-      verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3);
-      putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15,
-          25, server1, server2);
-      putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20,
-          30, server1, server2, server3);
-
-
-    } finally {
-      invokeRunnableInVMs(true, invokeStopLocator(), locator);
-      invokeRunnableInVMs(true, invokeCloseCache(), server1, server2, server3);
-      if ((regionType.equals("persistentPartitioned"))) {
-        deleteDiskStores();
-      }
-    }
-  }
 
   void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
       int expectedRegionSize, int start, int end, VM... vms) throws Exception {
@@ -370,6 +244,15 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     });
   }
 
+  void updateClientSingleHopMetadata(String regionName) {
+    ClientMetadataService cms = ((InternalCache) cache)
+        .getClientMetadataService();
+    cms.scheduleGetPRMetaData(
+        (LocalRegion) ((InternalCache) cache).getRegion(regionName), true);
+    GeodeAwaitility.await("Awaiting ClientMetadataService.isMetadataStable()")
+        .untilAsserted(() -> assertThat(cms.isMetadataStable()).isTrue());
+  }
+
   void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
       throws Exception {
     Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
@@ -399,7 +282,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
         resultsActive.size() + resultsInactive.size());
   }
 
-  private Collection executeLuceneQuery(Object luceneQuery)
+  protected Collection executeLuceneQuery(Object luceneQuery)
       throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
     Collection results = null;
     int retryCount = 10;
@@ -420,7 +303,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
 
   }
 
-  private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
+  protected void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
       VM... vms) {
     for (VM vm : vms) {
       vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
@@ -459,15 +342,18 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return rollServer;
   }
 
-  VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType,
-      File diskdir, String shortcutName, String regionName, int[] locatorPorts) {
+  VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer,
+      String regionType,
+      File diskdir, String shortcutName, String regionName, int[] locatorPorts,
+      boolean reindex) {
     VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
     return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
-        rollServer);
+        rollServer, reindex);
   }
 
-  private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir,
-      String shortcutName, String regionName, VM rollServer) {
+  private VM createLuceneIndexAndRegionOnRolledServer(String regionType,
+      File diskdir,
+      String shortcutName, String regionName, VM rollServer, boolean reindex) {
 
     Boolean serializeIt = reindex;
     rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
@@ -555,7 +441,8 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return locatorString.toString();
   }
 
-  private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
+  protected CacheSerializableRunnable invokeStartLocator(final String serverHostName,
+      final int port,
       final String testName, final String locatorsString, final Properties props) {
     return new CacheSerializableRunnable("execute: startLocator") {
       @Override
@@ -588,7 +475,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties);
+          LuceneSearchWithRollingUpgradeTestBase.cache = createCache(systemProperties);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -601,7 +488,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version);
+          assertVersion(LuceneSearchWithRollingUpgradeTestBase.cache, version);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -615,7 +502,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName);
+          createRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, shortcutName);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -623,13 +510,14 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName,
+  protected CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(
+      final String regionName,
       final File diskstore) {
     return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
       @Override
       public void run2() {
         try {
-          createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
+          createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName,
               diskstore);
         } catch (Exception e) {
           throw new RuntimeException(e);
@@ -638,13 +526,13 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private CacheSerializableRunnable invokePut(final String regionName, final Object key,
+  protected CacheSerializableRunnable invokePut(final String regionName, final Object key,
       final Object value) {
     return new CacheSerializableRunnable("execute: put") {
       @Override
       public void run2() {
         try {
-          put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value);
+          put(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, key, value);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -670,7 +558,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
       @Override
       public void run2() {
         try {
-          closeCache(LuceneSearchWithRollingUpgradeDUnit.cache);
+          closeCache(LuceneSearchWithRollingUpgradeTestBase.cache);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -678,12 +566,12 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private CacheSerializableRunnable invokeRebalance() {
+  protected CacheSerializableRunnable invokeRebalance() {
     return new CacheSerializableRunnable("execute: rebalance") {
       @Override
       public void run2() {
         try {
-          rebalance(LuceneSearchWithRollingUpgradeDUnit.cache);
+          rebalance(LuceneSearchWithRollingUpgradeTestBase.cache);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -691,7 +579,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     };
   }
 
-  private void deleteDiskStores() {
+  protected void deleteDiskStores() {
     try {
       FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
     } catch (IOException e) {
@@ -699,7 +587,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  private static Object createCache(Properties systemProperties) throws Exception {
+  protected static Object createCache(Properties systemProperties) throws Exception {
 
     Class distConfigClass = Thread.currentThread().getContextClassLoader()
         .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
@@ -725,18 +613,18 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return cache;
   }
 
-  private static Object getRegion(Object cache, String regionName) throws Exception {
+  protected static Object getRegion(Object cache, String regionName) throws Exception {
     return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
   }
 
-  private static Object put(Object cache, String regionName, Object key, Object value)
+  protected static Object put(Object cache, String regionName, Object key, Object value)
       throws Exception {
     Object region = getRegion(cache, regionName);
     return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
         value);
   }
 
-  private static void createRegion(Object cache, String regionName, String shortcutName)
+  protected static void createRegion(Object cache, String regionName, String shortcutName)
       throws Exception {
     Class aClass = Thread.currentThread().getContextClassLoader()
         .loadClass("org.apache.geode.cache.RegionShortcut");
@@ -791,7 +679,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
         .invoke(luceneIndexFactory, indexName, regionName, true);
   }
 
-  private static void createPersistentPartitonedRegion(Object cache, String regionName,
+  protected static void createPersistentPartitonedRegion(Object cache, String regionName,
       File diskStore) throws Exception {
     Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
     Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
@@ -810,7 +698,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
   }
 
-  private static void assertVersion(Object cache, short ordinal) throws Exception {
+  protected static void assertVersion(Object cache, short ordinal) throws Exception {
     Class idmClass = Thread.currentThread().getContextClassLoader()
         .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
     Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
@@ -832,7 +720,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  private static void stopCacheServers(Object cache) throws Exception {
+  protected static void stopCacheServers(Object cache) throws Exception {
     Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
     getCacheServersMethod.setAccessible(true);
     List cacheServers = (List) getCacheServersMethod.invoke(cache);
@@ -846,7 +734,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  private static void closeCache(Object cache) throws Exception {
+  protected static void closeCache(Object cache) throws Exception {
     if (cache == null) {
       return;
     }
@@ -872,7 +760,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     }
   }
 
-  private static void rebalance(Object cache) throws Exception {
+  protected static void rebalance(Object cache) throws Exception {
     Method getRMMethod = cache.getClass().getMethod("getResourceManager");
     getRMMethod.setAccessible(true);
     Object manager = getRMMethod.invoke(cache);
@@ -904,7 +792,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
   /**
    * Starts a locator with given configuration.
    */
-  private static void startLocator(final String serverHostName, final int port,
+  protected static void startLocator(final String serverHostName, final int port,
       final String testName, final String locatorsString, final Properties props) throws Exception {
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
@@ -928,7 +816,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
   }
 
-  private static void startLocator(final String serverHostName, final int port, Properties props)
+  protected static void startLocator(final String serverHostName, final int port, Properties props)
       throws Exception {
 
 
@@ -943,7 +831,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
   }
 
-  private static void stopLocator() throws Exception {
+  protected static void stopLocator() throws Exception {
     Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
         .loadClass("org.apache.geode.distributed.internal.InternalLocator");
     Method locatorMethod = internalLocatorClass.getMethod("getLocator");
@@ -959,7 +847,7 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
    *
    * @return locator address
    */
-  private static String getDUnitLocatorAddress() {
+  protected static String getDUnitLocatorAddress() {
     return Host.getHost(0).getHostName();
   }
 
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
index cb68055..8178ea6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
@@ -84,7 +84,7 @@ public class RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServe
           locatorString);
 
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
           25, server2);
@@ -97,7 +97,7 @@ public class RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServe
           30, server1);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 25,
           35, server1, server2);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
similarity index 74%
copy from geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
copy to geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
index 3acba6c..38fe3c4 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
@@ -17,8 +17,12 @@ package org.apache.geode.cache.lucene;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
+import java.util.ArrayList;
+import java.util.Collection;
+
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientRegionShortcut;
@@ -28,31 +32,40 @@ import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.VersionManager;
+
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion
+    extends LuceneSearchWithRollingUpgradeTestBase {
+
+  @Parameterized.Parameter()
+  public Boolean reindex;
+
+  @Parameterized.Parameter(1)
+  public Boolean singleHopEnabled;
+
+  @Parameterized.Parameters(name = "currentVersion, reindex={0}, singleHopEnabled={1}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> rval = new ArrayList<>();
+    rval.add(new Object[] {true, true});
+    rval.add(new Object[] {true, false});
+    rval.add(new Object[] {false, true});
+    rval.add(new Object[] {false, false});
+    return rval;
+  }
 
-public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated
-    extends LuceneSearchWithRollingUpgradeDUnit {
-
-  @Ignore("Disabled until GEODE-7258 is fixed")
   @Test
-  public void test()
+  public void functionsFailOverWhenRestartOneServer()
       throws Exception {
-    // This test verifies the upgrade from lucene 6 to 7 doesn't cause any issues. Without any
-    // changes to accomodate this upgrade, this test will fail with an IndexFormatTooNewException.
-    //
-    // The main sequence in this test that causes the failure is:
-    //
-    // - start two servers with old version using Lucene 6
-    // - roll one server to new version server using Lucene 7
-    // - do puts into primary buckets in new server which creates entries in the fileAndChunk region
-    // with Lucene 7 format
-    // - stop the new version server which causes the old version server to become primary for those
-    // buckets
-    // - do a query which causes the IndexFormatTooNewException to be thrown
+    // Since the changes relating to GEODE-7258 is not applied on 1.10.0,
+    // use this test to roll from develop to develop to verify.
     final Host host = Host.getHost(0);
-    VM locator = host.getVM(oldVersion, 0);
-    VM server1 = host.getVM(oldVersion, 1);
-    VM server2 = host.getVM(oldVersion, 2);
-    VM client = host.getVM(oldVersion, 3);
+    VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0);
+    VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+    VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+    VM client = host.getVM(VersionManager.CURRENT_VERSION, 3);
 
     final String regionName = "aRegion";
     String regionType = "partitionedRedundant";
@@ -85,7 +98,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
 
       // Create the index on the servers
@@ -107,10 +121,13 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
           locatorString);
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
 
       // Execute a query on the client and verify the results. This also waits until flushed.
-      client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+      client.invoke(() -> {
+        updateClientSingleHopMetadata(regionName);
+        verifyLuceneQueryResults(regionName, numObjects);
+      });
 
       // Put some objects on the client. This will update the document to the latest lucene version
       putSerializableObject(client, regionName, 0, numObjects);
@@ -128,4 +145,5 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(true, invokeCloseCache(), client, server2);
     }
   }
+
 }
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
index 95c0498..df1e329 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
@@ -70,7 +70,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
 
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
       server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
       server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
@@ -87,7 +88,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
           locatorString);
 
       server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 20,
@@ -97,7 +98,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
           40, server2);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 40,
@@ -107,7 +108,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
           60, server3);
 
       client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName,
-          hostNames, locatorPorts, false);
+          hostNames, locatorPorts, false, singleHopEnabled);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60,
           70, server2, server3);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
index 3acba6c..430e8fa 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
@@ -17,7 +17,6 @@ package org.apache.geode.cache.lucene;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.cache.RegionShortcut;
@@ -32,7 +31,6 @@ import org.apache.geode.test.dunit.VM;
 public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated
     extends LuceneSearchWithRollingUpgradeDUnit {
 
-  @Ignore("Disabled until GEODE-7258 is fixed")
   @Test
   public void test()
       throws Exception {
@@ -85,7 +83,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
 
       // Create the index on the servers
@@ -107,7 +106,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
           locatorString);
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
 
       // Execute a query on the client and verify the results. This also waits until flushed.
       client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));