You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/10/18 17:31:40 UTC

[geode] branch feature/GEODE-7258 created (now e9ebda3)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-7258
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at e9ebda3  GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed.

This branch includes the following new commits:

     new e9ebda3  GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed.

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7258
in repository https://gitbox.apache.org/repos/asf/geode.git

commit e9ebda3b4fb2b64ed708019f987e6ffc8c8202d2
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Oct 18 10:27:46 2019 -0700

    GEODE-7258: The function retry logic is modified to handle exception
    thrown, while trying to connect to a server thats shutdown/closed.
    
        Co-authored-by: Anil <ag...@pivotal.io>
        Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../cache/client/internal/ExecuteFunctionOp.java   | 10 ++---
 .../client/internal/ExecuteRegionFunctionOp.java   | 15 ++-----
 .../internal/ExecuteRegionFunctionSingleHopOp.java |  3 +-
 .../geode/cache/client/internal/PoolImpl.java      | 24 +++++++++++
 .../cache/client/internal/ServerRegionProxy.java   |  8 ++--
 .../client/internal/SingleHopClientExecutor.java   | 17 ++------
 .../internal/pooling/ConnectionManagerImpl.java    |  4 +-
 .../internal/ExecuteFunctionOpRetryTest.java       |  5 +--
 .../internal/ExecuteFunctionTestSupport.java       |  8 ++--
 .../internal/ExecuteRegionFunctionOpRetryTest.java |  5 ++-
 .../ExecuteRegionFunctionSingleHopOpRetryTest.java |  9 ++---
 .../LuceneSearchWithRollingUpgradeDUnit.java       | 46 +++++++++++++++-------
 ...tResultsAfterClientAndServersAreRolledOver.java |  5 ++-
 ...ntAndServersAreRolledOverAllBucketsCreated.java |  5 +--
 14 files changed, 93 insertions(+), 71 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
index 09f926c..91baa4d 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.execute.Function;
@@ -83,8 +82,8 @@ public class ExecuteFunctionOp {
     } else {
 
       boolean reexecute = false;
+      int maxRetryAttempts = -1;
 
-      int maxRetryAttempts = pool.getRetryAttempts();
       if (!isHA) {
         maxRetryAttempts = 0;
       }
@@ -107,11 +106,8 @@ public class ExecuteFunctionOp {
 
         } catch (ServerConnectivityException se) {
 
-          if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-            // If the retryAttempt is set to default(-1). Try it on all servers once.
-            // Calculating number of servers when function is re-executed as it involves
-            // messaging locator.
-            maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+          if (maxRetryAttempts == -1) {
+            maxRetryAttempts = pool.calculateRetryAttempts(se);
           }
 
           if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index eba38a6..776ea2e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.client.NoAvailableServersException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
@@ -67,17 +66,14 @@ public class ExecuteRegionFunctionOp {
   /**
    * Does a execute Function on a server using connections from the given pool to communicate with
    * the server.
-   *
-   * @param pool the pool to use to communicate with the server.
-   * @param resultCollector is used to collect the results from the Server
-   * @param maxRetryAttempts Maximum number of retry attempts
    */
   static void execute(ExecutablePool pool,
       ResultCollector resultCollector,
-      int maxRetryAttempts, boolean isHA,
+      int retryAttempts, boolean isHA,
       ExecuteRegionFunctionOpImpl op, boolean isReexecute,
       Set<String> failedNodes) {
 
+    int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : -1;
     if (!isHA) {
       maxRetryAttempts = 0;
     }
@@ -107,11 +103,8 @@ public class ExecuteRegionFunctionOp {
         throw failedException;
       } catch (ServerConnectivityException se) {
 
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
+        if (maxRetryAttempts == -1) {
+          maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se);
         }
 
         if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index eb162829..8fa8510 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -66,7 +66,6 @@ public class ExecuteRegionFunctionSingleHopOp {
       ServerRegionFunctionExecutor serverRegionExecutor,
       ResultCollector resultCollector,
       Map<ServerLocation, ? extends HashSet> serverToFilterMap,
-      int mRetryAttempts,
       boolean isHA,
       final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction,
       final Supplier<AbstractOp> executeRegionFunctionOpSupplier) {
@@ -87,7 +86,7 @@ public class ExecuteRegionFunctionSingleHopOp {
 
     final int retryAttempts =
         SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA,
-            resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool));
+            resultCollector, failedNodes, ((PoolImpl) pool));
 
     if (isDebugEnabled) {
       logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index 3dbd99f..bb12253 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.NoSubscriptionServersAvailableException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionService;
 import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.SubscriptionNotEnabledException;
 import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
@@ -1581,4 +1582,27 @@ public class PoolImpl implements InternalPool {
   public int getSubscriptionTimeoutMultiplier() {
     return subscriptionTimeoutMultiplier;
   }
+
+  public int calculateRetryAttempts(Throwable cause) {
+
+    int maxRetryAttempts = getRetryAttempts();
+
+    if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
+      // If the retryAttempt is set to default(-1). Try executing on all servers once.
+      // As calculating number of servers involves sending message to locator, it is
+      // done only when there is an exception.
+      if (cause instanceof ServerConnectivityException
+          && cause.getMessage().contains("Could not create a new connection")) {
+        // The client was unable to establish a connection before sending the
+        // request.
+        maxRetryAttempts = getConnectionSource().getAllServers().size();
+      } else {
+        // The request was sent once.
+        maxRetryAttempts = getConnectionSource().getAllServers().size() - 1;
+      }
+    }
+
+    return maxRetryAttempts;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index 9481cb4..cd15529 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -699,7 +699,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     hasResult, emptySet(), true, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor,
-                resultCollector, serverToBuckets, retryAttempts, function.isHA(),
+                resultCollector, serverToBuckets, function.isHA(),
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         } else {
@@ -725,7 +725,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     hasResult, emptySet(), isBucketFilter, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 function.isHA(), regionFunctionSingleHopOpFunction,
                 executeRegionFunctionOpSupplier);
           }
@@ -786,7 +786,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     emptySet(), true, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA,
+                serverRegionExecutor, resultCollector, serverToBuckets, isHA,
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
 
@@ -810,7 +810,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc
                     emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
index 40a128c..c4c3eb6 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
@@ -30,7 +30,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl;
@@ -89,11 +88,10 @@ public class SingleHopClientExecutor {
 
   static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA,
       ResultCollector rc, Set<String> failedNodes,
-      final int retryAttemptsArg,
       final PoolImpl pool) {
 
-    ClientMetadataService cms = region.getCache().getClientMetadataService();
-    int maxRetryAttempts = 0;
+    ClientMetadataService cms;
+    int maxRetryAttempts = -1;
 
     if (callableTasks != null && !callableTasks.isEmpty()) {
       List futures = null;
@@ -120,15 +118,8 @@ public class SingleHopClientExecutor {
             throw new InternalGemFireException(e.getMessage());
           } catch (ExecutionException ee) {
 
-            if (maxRetryAttempts == 0) {
-              maxRetryAttempts = retryAttemptsArg;
-            }
-
-            if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-              // If the retryAttempt is set to default(-1). Try it on all servers once.
-              // Calculating number of servers when function is re-executed as it involves
-              // messaging locator.
-              maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+            if (maxRetryAttempts == -1) {
+              maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause());
             }
 
             if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index a2a343f..e37899d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -70,6 +70,7 @@ import org.apache.geode.security.GemFireSecurityException;
 public class ConnectionManagerImpl implements ConnectionManager {
   private static final Logger logger = LogService.getLogger();
   private static final int NOT_WAITING = -1;
+  public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server ";
 
   private final String poolName;
   private final PoolStats poolStats;
@@ -321,8 +322,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
       return connection;
     }
 
-    throw new ServerConnectivityException(
-        "Could not create a new connection to server " + server);
+    throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
index ae9ae13..bb11a04 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
@@ -228,9 +228,8 @@ public class ExecuteFunctionOpRetryTest {
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
-
-    when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts);
+            failureMode),
+        retryAttempts);
 
     args = null;
     memberMappedArg = mock(MemberMappedArgument.class);
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
index c91816f..1d0cad5 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
@@ -15,6 +15,7 @@
 package org.apache.geode.cache.client.internal;
 
 import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -91,8 +92,6 @@ class ExecuteFunctionTestSupport {
    * This method has to be {@code static} because it is called before
    * {@link ExecuteFunctionTestSupport} is constructed.
    *
-   * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()}
-   *        methods on {@link PoolImpl}
    * @param failureMode is the {@link FailureMode} that determines the kind of exception
    *        to {@code throw}
    */
@@ -149,7 +148,7 @@ class ExecuteFunctionTestSupport {
   ExecuteFunctionTestSupport(
       final HAStatus haStatus,
       final FailureMode failureMode,
-      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) {
+      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) {
 
     final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class);
     when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS);
@@ -174,6 +173,9 @@ class ExecuteFunctionTestSupport {
 
     executablePool = mock(PoolImpl.class);
     when(executablePool.getConnectionSource()).thenReturn(connectionSource);
+    when(executablePool.getRetryAttempts()).thenReturn(retryAttempts);
+    when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class)))
+        .thenCallRealMethod();
 
     addPoolMockBehavior.accept(executablePool, failureMode);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
index 74f6748..e92bad1 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
@@ -309,7 +309,7 @@ public class ExecuteRegionFunctionOpRetryTest {
             default:
               throw new AssertionError("unknown FailureMode type: " + failureMode);
           }
-        });
+        }, retryAttempts);
 
     executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -325,7 +325,8 @@ public class ExecuteRegionFunctionOpRetryTest {
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
index aef00fb..5649b1b 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
@@ -137,7 +137,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
   }
 
   private void createMocks(final HAStatus haStatus,
-      final FailureMode failureModeArg) {
+      final FailureMode failureModeArg, Integer retryAttempts) {
 
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(
@@ -146,7 +146,8 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
                 ArgumentMatchers.any(),
                 ArgumentMatchers.anyBoolean(),
                 ArgumentMatchers.anyBoolean())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     serverToFilterMap = new HashMap<>();
     serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>());
@@ -158,7 +159,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
       final int retryAttempts, final int expectTries,
       final FailureMode failureMode) {
 
-    createMocks(haStatus, failureMode);
+    createMocks(haStatus, failureMode, retryAttempts);
 
     executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -182,7 +183,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
             () -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute(
                 executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 testSupport.toBoolean(haStatus),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), FUNCTION_NAME,
@@ -199,7 +199,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest {
         ignoreServerConnectivityException(
             () -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 function.isHA(),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), function,
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index 2a8cf9f..020961d 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -68,14 +68,16 @@ import org.apache.geode.test.version.VersionManager;
 public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase {
 
 
-  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}")
+  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}")
   public static Collection<Object[]> data() {
     Collection<String> luceneVersions = getLuceneVersions();
     Collection<Object[]> rval = new ArrayList<>();
     luceneVersions.forEach(v -> {
-      rval.add(new Object[] {v, true});
-      rval.add(new Object[] {v, false});
+      rval.add(new Object[] {v, true, true});
+      rval.add(new Object[] {v, false, true});
     });
+    rval.add(new Object[] {VersionManager.CURRENT_VERSION, true, true});
+    rval.add(new Object[] {VersionManager.CURRENT_VERSION, true, false});
     return rval;
   }
 
@@ -84,6 +86,10 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     // Lucene Compatibility checks start with Apache Geode v1.2.0
     // Removing the versions older than v1.2.0
     result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0);
+
+    // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling
+    // upgrade for 1.10.0. The change was verified by rolling from develop to develop.
+    result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0);
     if (result.size() < 1) {
       throw new RuntimeException("No older versions of Geode were found to test against");
     } else {
@@ -108,6 +114,9 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
   @Parameterized.Parameter(1)
   public Boolean reindex;
 
+  @Parameterized.Parameter(2)
+  public Boolean singleHopEnabled;
+
   private void deleteVMFiles() {
     System.out.println("deleting files in vm" + VM.getCurrentVMNum());
     File pwd = new File(".");
@@ -145,20 +154,24 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     return props;
   }
 
-  VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut,
-      String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) {
-    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled);
+  VM rollClientToCurrentAndCreateRegion(VM oldClient,
+      ClientRegionShortcut shortcut,
+      String regionName, String[] hostNames, int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
+    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled,
+        singleHopEnabled);
     // recreate region on "rolled" client
     invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
     return rollClient;
   }
 
-  private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts,
-      boolean subscriptionEnabled) {
+  private VM rollClientToCurrent(VM oldClient, String[] hostNames,
+      int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
     oldClient.invoke(invokeCloseCache());
     VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
     rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
-        subscriptionEnabled));
+        subscriptionEnabled, singleHopEnabled));
     rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
     return rollClient;
   }
@@ -203,14 +216,17 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
     cacheServer.start();
   }
 
-  CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties,
-      final String[] hosts, final int[] ports, boolean subscriptionEnabled) {
+  CacheSerializableRunnable invokeCreateClientCache(
+      final Properties systemProperties,
+      final String[] hosts, final int[] ports, boolean subscriptionEnabled,
+      boolean singleHopEnabled) {
     return new CacheSerializableRunnable("execute: createClientCache") {
       @Override
       public void run2() {
         try {
           LuceneSearchWithRollingUpgradeDUnit.cache =
-              createClientCache(systemProperties, hosts, ports, subscriptionEnabled);
+              createClientCache(systemProperties, hosts, ports, subscriptionEnabled,
+                  singleHopEnabled);
         } catch (Exception e) {
           fail("Error creating client cache", e);
         }
@@ -225,13 +241,15 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu
   }
 
 
-  private static ClientCache createClientCache(Properties systemProperties, String[] hosts,
-      int[] ports, boolean subscriptionEnabled) {
+  private static ClientCache createClientCache(Properties systemProperties,
+      String[] hosts,
+      int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) {
     ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
     if (subscriptionEnabled) {
       cf.setPoolSubscriptionEnabled(true);
       cf.setPoolSubscriptionRedundancy(-1);
     }
+    cf.setPoolPRSingleHopEnabled(singleHopEnabled);
     int hostsLength = hosts.length;
     for (int i = 0; i < hostsLength; i++) {
       cf.addPoolLocator(hosts[i], ports[i]);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
index 95c0498..9ef5ca8 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
@@ -70,7 +70,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
 
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
       server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
       server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
@@ -107,7 +108,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
           60, server3);
 
       client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName,
-          hostNames, locatorPorts, false);
+          hostNames, locatorPorts, false, singleHopEnabled);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60,
           70, server2, server3);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
index 3acba6c..1d7670c 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
@@ -17,7 +17,6 @@ package org.apache.geode.cache.lucene;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.cache.RegionShortcut;
@@ -32,7 +31,6 @@ import org.apache.geode.test.dunit.VM;
 public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated
     extends LuceneSearchWithRollingUpgradeDUnit {
 
-  @Ignore("Disabled until GEODE-7258 is fixed")
   @Test
   public void test()
       throws Exception {
@@ -85,7 +83,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
 
       // Create the index on the servers