You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2019/06/22 01:56:32 UTC

[geode] branch develop updated: GEODE-6798: Refactoring of client function execution logic (#3710)

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 18530da  GEODE-6798: Refactoring of client function execution logic (#3710)
18530da is described below

commit 18530da5ae33fc2c922bb4a2aeb5e00a57eaa31e
Author: albertogpz <al...@est.tech>
AuthorDate: Sat Jun 22 03:56:19 2019 +0200

    GEODE-6798: Refactoring of client function execution logic (#3710)
    
    * Some unit tests for the ExecuteRegionFunctionOpImplTest constructors
    * Do not allocate failedNodes Collection unless necessary
---
 .../client/internal/ExecuteRegionFunctionOp.java   | 335 +++++++--------------
 .../internal/ExecuteRegionFunctionOpImplTest.java  | 128 ++++++++
 2 files changed, 233 insertions(+), 230 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index 6c95b02..641fd26 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal;
 import static org.apache.geode.internal.cache.execute.AbstractExecution.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -63,49 +64,32 @@ public class ExecuteRegionFunctionOp {
     // no instances allowed
   }
 
-  /**
-   * Does a execute Function on a server using connections from the given pool to communicate with
-   * the server.
-   *
-   * @param pool the pool to use to communicate with the server.
-   * @param region the name of the region to do the put on
-   * @param function to be executed
-   * @param serverRegionExecutor which will return argument and filter
-   * @param resultCollector is used to collect the results from the Server
-   * @param timeoutMs timeout in milliseconds
-   */
-  public static void execute(ExecutablePool pool, String region, Function function,
+  private static void execute(ExecutablePool pool, String region, String function,
       ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
-      byte hasResult, int mRetryAttempts, final int timeoutMs) {
-
-    ExecuteRegionFunctionOpImpl op =
-        new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
-            resultCollector, hasResult, new HashSet<>(), timeoutMs);
-    boolean reexecute = false;
-    boolean reexecuteForServ = false;
-    Set<String> failedNodes = new HashSet<>();
-    AbstractOp reexecOp;
-
-    int maxRetryAttempts = mRetryAttempts;
-    if (!function.isHA()) {
+      byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite,
+      ExecuteRegionFunctionOpImpl op, boolean isReexecute, Set<String> failedNodes) {
+
+    if (!isHA) {
       maxRetryAttempts = 0;
     }
 
     do {
       try {
-        if (reexecuteForServ) {
-          reexecOp = new ExecuteRegionFunctionOpImpl(op,
+        if (isReexecute) {
+          failedNodes = ensureMutability(failedNodes);
+          op = new ExecuteRegionFunctionOpImpl(op,
               (byte) 1/* isReExecute */, failedNodes);
-          pool.execute(reexecOp, 0);
-        } else {
-          pool.execute(op, 0);
         }
-        reexecute = false;
-        reexecuteForServ = false;
+        pool.execute(op, 0);
+        return;
       } catch (InternalFunctionInvocationTargetException e) {
-        reexecute = true;
         resultCollector.clearResults();
+        if (!isHA) {
+          return;
+        }
+        isReexecute = true;
         Set<String> failedNodesIds = e.getFailedNodeSet();
+        failedNodes = ensureMutability(failedNodes);
         failedNodes.clear();
         if (failedNodesIds != null) {
           failedNodes.addAll(failedNodesIds);
@@ -125,126 +109,68 @@ public class ExecuteRegionFunctionOp {
           throw se;
         }
 
-        reexecuteForServ = true;
+        isReexecute = true;
         resultCollector.clearResults();
+        failedNodes = ensureMutability(failedNodes);
         failedNodes.clear();
       }
-    } while (reexecuteForServ);
+    } while (true);
+  }
 
-    if (reexecute && function.isHA()) {
-      ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor,
-          resultCollector, hasResult, failedNodes, maxRetryAttempts, timeoutMs);
+  private static Set<String> ensureMutability(final Set<String> failedNodes) {
+    if (failedNodes == Collections.EMPTY_SET) {
+      return new HashSet<>();
     }
+    return failedNodes;
   }
 
-  public static void execute(ExecutablePool pool, String region, String function,
+  /**
+   * Does a execute Function on a server using connections from the given pool to communicate with
+   * the server.
+   *
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the put on
+   * @param function to be executed
+   * @param serverRegionExecutor which will return argument and filter
+   * @param resultCollector is used to collect the results from the Server
+   * @param hasResult is used to collect the results from the Server
+   * @param maxRetryAttempts Maximum number of retry attempts
+   * @param timeoutMs timeout in milliseconds
+   */
+  static void execute(ExecutablePool pool, String region, Function function,
       ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
-      byte hasResult, int mRetryAttempts, boolean isHA, boolean optimizeForWrite,
-      final int timeoutMs) {
+      byte hasResult, int maxRetryAttempts, final int timeoutMs) {
 
-    ExecuteRegionFunctionOpImpl op =
-        new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
-            resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs);
-    boolean reexecute = false;
-    boolean reexecuteForServ = false;
-    Set<String> failedNodes = new HashSet<>();
-    AbstractOp reexecOp;
+    ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+        serverRegionExecutor, resultCollector, timeoutMs);
 
-    int maxRetryAttempts = mRetryAttempts;
-    if (!isHA) {
-      maxRetryAttempts = 0;
-    }
-
-    do {
-      try {
-        if (reexecuteForServ) {
-          reexecOp = new ExecuteRegionFunctionOpImpl(op,
-              (byte) 1/* isReExecute */, failedNodes);
-          pool.execute(reexecOp, 0);
-        } else {
-          pool.execute(op, 0);
-        }
-        reexecute = false;
-        reexecuteForServ = false;
-      } catch (InternalFunctionInvocationTargetException e) {
-        reexecute = true;
-        resultCollector.clearResults();
-        Set<String> failedNodesIds = e.getFailedNodeSet();
-        failedNodes.clear();
-        if (failedNodesIds != null) {
-          failedNodes.addAll(failedNodesIds);
-        }
-      } catch (ServerOperationException | NoAvailableServersException failedException) {
-        throw failedException;
-      } catch (ServerConnectivityException se) {
-
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
-        }
+    execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult,
+        maxRetryAttempts, function.isHA(), function.optimizeForWrite(), op, false,
+        Collections.emptySet());
+  }
 
-        if ((maxRetryAttempts--) < 1) {
-          throw se;
-        }
+  static void execute(ExecutablePool pool, String region, String function,
+      ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
+      byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite,
+      final int timeoutMs) {
 
-        reexecuteForServ = true;
-        resultCollector.clearResults();
-        failedNodes.clear();
-      }
-    } while (reexecuteForServ);
+    ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+        serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite,
+        true, timeoutMs);
 
-    if (reexecute && isHA) {
-      ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor,
-          resultCollector, hasResult, failedNodes, maxRetryAttempts, isHA, optimizeForWrite,
-          timeoutMs);
-    }
+    execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult,
+        maxRetryAttempts, isHA, optimizeForWrite, op, false, Collections.emptySet());
   }
 
   static void reexecute(ExecutablePool pool, String region, Function function,
       ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
       byte hasResult, Set<String> failedNodes, int retryAttempts, final int timeoutMs) {
 
-    ExecuteRegionFunctionOpImpl op =
-        new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
-            resultCollector, hasResult, new HashSet<>(), timeoutMs);
-    boolean reexecute = true;
-    int maxRetryAttempts = retryAttempts;
-
-    do {
-      AbstractOp reExecuteOp =
-          new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes);
-
-      try {
-        pool.execute(reExecuteOp, 0);
-        reexecute = false;
-      } catch (InternalFunctionInvocationTargetException e) {
-        resultCollector.clearResults();
-        Set<String> failedNodesIds = e.getFailedNodeSet();
-        failedNodes.clear();
-        if (failedNodesIds != null) {
-          failedNodes.addAll(failedNodesIds);
-        }
-      } catch (ServerOperationException | NoAvailableServersException failedException) {
-        throw failedException;
-      } catch (ServerConnectivityException se) {
-
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
-        }
-
-        if ((maxRetryAttempts--) < 1) {
-          throw se;
-        }
+    ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+        serverRegionExecutor, resultCollector, timeoutMs);
 
-        resultCollector.clearResults();
-        failedNodes.clear();
-      }
-    } while (reexecute);
+    execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult,
+        retryAttempts, function.isHA(), function.optimizeForWrite(), op, true, failedNodes);
   }
 
   static void reexecute(ExecutablePool pool, String region, String function,
@@ -252,45 +178,12 @@ public class ExecuteRegionFunctionOp {
       byte hasResult, Set<String> failedNodes, int retryAttempts, boolean isHA,
       boolean optimizeForWrite, final int timeoutMs) {
 
-    ExecuteRegionFunctionOpImpl op =
-        new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
-            resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs);
-    boolean reexecute = true;
-    int maxRetryAttempts = retryAttempts;
-
-    do {
-      ExecuteRegionFunctionOpImpl reExecuteOp =
-          new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes);
-
-      try {
-        pool.execute(reExecuteOp, 0);
-        reexecute = false;
-      } catch (InternalFunctionInvocationTargetException e) {
-        resultCollector.clearResults();
-        Set<String> failedNodesIds = e.getFailedNodeSet();
-        failedNodes.clear();
-        if (failedNodesIds != null) {
-          failedNodes.addAll(failedNodesIds);
-        }
-      } catch (ServerOperationException | NoAvailableServersException failedException) {
-        throw failedException;
-      } catch (ServerConnectivityException se) {
+    ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+        serverRegionExecutor, resultCollector, hasResult, isHA,
+        optimizeForWrite, true, timeoutMs);
 
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
-        }
-
-        if ((maxRetryAttempts--) < 1) {
-          throw se;
-        }
-
-        resultCollector.clearResults();
-        failedNodes.clear();
-      }
-    } while (reexecute);
+    execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult,
+        retryAttempts, isHA, optimizeForWrite, op, true, failedNodes);
   }
 
   static class ExecuteRegionFunctionOpImpl extends AbstractOpWithTimeout {
@@ -319,29 +212,28 @@ public class ExecuteRegionFunctionOp {
 
     private FunctionException functionException;
 
+    private static final int PART_COUNT = 8;
 
-    ExecuteRegionFunctionOpImpl(String region, Function function,
-        ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult,
-        Set<String> removedNodes, final int timeoutMs) {
-      super(MessageType.EXECUTE_REGION_FUNCTION,
-          8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs);
+    private static int getMessagePartCount(int filterSize, int removedNodesSize) {
+      return PART_COUNT + filterSize + removedNodesSize;
+    }
+
+    private void fillMessage(String region, Function function, String functionId,
+        ServerRegionFunctionExecutor serverRegionExecutor,
+        Set<String> removedNodes, byte functionState, byte flags) {
       Set routingObjects = serverRegionExecutor.getFilter();
       Object args = serverRegionExecutor.getArguments();
-      byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(),
-          function.optimizeForWrite());
       MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
-
       addBytes(functionState);
       getMessage().addStringPart(region, true);
-      if (serverRegionExecutor.isFnSerializationReqd()) {
+      if (function != null && serverRegionExecutor.isFnSerializationReqd()) {
         getMessage().addStringOrObjPart(function);
       } else {
-        getMessage().addStringOrObjPart(function.getId());
+        getMessage().addStringOrObjPart(functionId);
       }
+
       getMessage().addObjPart(args);
       getMessage().addObjPart(memberMappedArg);
-      executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
-      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
 
       getMessage().addBytesPart(new byte[] {flags});
       getMessage().addIntPart(routingObjects.size());
@@ -352,14 +244,27 @@ public class ExecuteRegionFunctionOp {
       for (Object nodes : removedNodes) {
         getMessage().addStringOrObjPart(nodes);
       }
+    }
 
+    ExecuteRegionFunctionOpImpl(String region, Function function,
+        ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+        final int timeoutMs) {
+      super(MessageType.EXECUTE_REGION_FUNCTION,
+          getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs);
+      executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+      byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(),
+          function.optimizeForWrite());
+      failedNodes = Collections.emptySet();
+      fillMessage(region,
+          function, function.getId(),
+          serverRegionExecutor, failedNodes, functionState, flags);
       resultCollector = rc;
       regionName = region;
       this.function = function;
       functionId = function.getId();
       executor = serverRegionExecutor;
-      this.hasResult = functionState;
-      failedNodes = removedNodes;
+      hasResult = functionState;
       isHA = function.isHA();
     }
 
@@ -378,58 +283,45 @@ public class ExecuteRegionFunctionOp {
       isHA = true;
     }
 
-    ExecuteRegionFunctionOpImpl(String region, String function,
+    ExecuteRegionFunctionOpImpl(String region, String functionId,
         ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult,
-        Set<String> removedNodes, boolean isHA, boolean optimizeForWrite,
+        boolean isHA, boolean optimizeForWrite,
         boolean calculateFnState, final int timeoutMs) {
       super(MessageType.EXECUTE_REGION_FUNCTION,
-          8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs);
-      Set routingObjects = serverRegionExecutor.getFilter();
+          getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs);
+
       byte functionState = hasResult;
       if (calculateFnState) {
         functionState = AbstractExecution.getFunctionState(isHA,
             hasResult == (byte) 1, optimizeForWrite);
       }
-      Object args = serverRegionExecutor.getArguments();
-      MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
-      addBytes(functionState);
-      getMessage().addStringPart(region, true);
-      getMessage().addStringOrObjPart(function);
-      getMessage().addObjPart(args);
-      getMessage().addObjPart(memberMappedArg);
 
       executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
       byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
 
-      getMessage().addBytesPart(new byte[] {flags});
-      getMessage().addIntPart(routingObjects.size());
-      for (Object key : routingObjects) {
-        getMessage().addStringOrObjPart(key);
-      }
-      getMessage().addIntPart(removedNodes.size());
-      for (Object nodes : removedNodes) {
-        getMessage().addStringOrObjPart(nodes);
-      }
+      failedNodes = Collections.emptySet();
+      fillMessage(region, null, functionId, serverRegionExecutor, failedNodes, functionState,
+          flags);
 
       resultCollector = rc;
       regionName = region;
-      functionId = function;
+      this.functionId = functionId;
       executor = serverRegionExecutor;
       this.hasResult = functionState;
-      failedNodes = removedNodes;
       this.isHA = isHA;
     }
 
     ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionSingleHopOpImpl newop) {
       this(newop.getRegionName(), newop.getFunctionId(), newop.getExecutor(),
-          newop.getResultCollector(), newop.getHasResult(), new HashSet<>(), newop.isHA(),
+          newop.getResultCollector(), newop.getHasResult(), newop.isHA(),
           newop.optimizeForWrite(), false, newop.getTimeoutMs());
     }
 
     ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionOpImpl op, byte isReExecute,
         Set<String> removedNodes) {
       super(MessageType.EXECUTE_REGION_FUNCTION,
-          8 + op.executor.getFilter().size() + removedNodes.size(), op.getTimeoutMs());
+          getMessagePartCount(op.executor.getFilter().size(), removedNodes.size()),
+          op.getTimeoutMs());
       this.isReExecute = isReExecute;
       resultCollector = op.resultCollector;
       function = op.function;
@@ -445,30 +337,10 @@ public class ExecuteRegionFunctionOp {
         resultCollector.clearResults();
       }
 
-      Set routingObjects = executor.getFilter();
-      Object args = executor.getArguments();
-      MemberMappedArgument memberMappedArg = executor.getMemberMappedArgument();
-      getMessage().clear();
-      addBytes(hasResult);
-      getMessage().addStringPart(regionName, true);
-      if (executor.isFnSerializationReqd()) {
-        getMessage().addStringOrObjPart(function);
-      } else {
-        getMessage().addStringOrObjPart(functionId);
-      }
-      getMessage().addObjPart(args);
-      getMessage().addObjPart(memberMappedArg);
       byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
 
-      getMessage().addBytesPart(new byte[] {flags});
-      getMessage().addIntPart(routingObjects.size());
-      for (Object key : routingObjects) {
-        getMessage().addStringOrObjPart(key);
-      }
-      getMessage().addIntPart(removedNodes.size());
-      for (Object nodes : removedNodes) {
-        getMessage().addStringOrObjPart(nodes);
-      }
+      fillMessage(regionName, function, functionId,
+          executor, removedNodes, hasResult, flags);
     }
 
     private void addBytes(byte functionStateOrHasResult) {
@@ -524,6 +396,7 @@ public class ExecuteRegionFunctionOp {
                     .getCause() instanceof InternalFunctionInvocationTargetException) {
                   InternalFunctionInvocationTargetException ifite =
                       (InternalFunctionInvocationTargetException) ex.getCause();
+                  failedNodes = ensureMutability(failedNodes);
                   failedNodes.addAll(ifite.getFailedNodeSet());
                   addFunctionException((FunctionException) result);
                 } else {
@@ -555,6 +428,7 @@ public class ExecuteRegionFunctionOp {
                     if (resultResponse instanceof ArrayList) {
                       DistributedMember memberID =
                           (DistributedMember) ((ArrayList) resultResponse).get(1);
+                      failedNodes = ensureMutability(failedNodes);
                       failedNodes.add(memberID.getId());
                     }
                     functionException = new FunctionException(fite);
@@ -611,6 +485,7 @@ public class ExecuteRegionFunctionOp {
                   .getCause() instanceof InternalFunctionInvocationTargetException) {
                 InternalFunctionInvocationTargetException ifite =
                     (InternalFunctionInvocationTargetException) ex.getCause();
+                failedNodes = ensureMutability(failedNodes);
                 failedNodes.addAll(ifite.getFailedNodeSet());
               }
               throw ex;
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java
new file mode 100644
index 0000000..72b5748
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+
+/**
+ * Test ExecutionRegionFunctionOpImpl class
+ */
+@Category({ClientServerTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class ExecuteRegionFunctionOpImplTest {
+
+  @Test
+  public void testExecuteRegionFunctionOpImplWithFunction() {
+    ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters();
+
+    int numberOfParts = 10;
+    assertEquals(numberOfParts, op.getMessage().getNumberOfParts());
+    for (int i = 0; i < numberOfParts; i++) {
+      assertNotNull(op.getMessage().getPart(i));
+    }
+    assertNull(op.getMessage().getPart(numberOfParts));
+  }
+
+  @Test
+  public void testExecuteRegionFunctionOpImplWithFunctionIdCalculateFnState() {
+    ExecuteRegionFunctionOpImpl op = createOpWithFunctionIdOneFilter();
+
+    int numberOfParts = 9;
+    assertEquals(numberOfParts, op.getMessage().getNumberOfParts());
+    for (int i = 0; i < numberOfParts; i++) {
+      assertNotNull(op.getMessage().getPart(i));
+    }
+    assertNull(op.getMessage().getPart(numberOfParts));
+  }
+
+  @Test
+  public void testExecuteRegionFunctionOpImplWithOpAndIsReexecute() {
+    ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters();
+
+    HashSet<String> removedNodes = new HashSet(Arrays.asList("node1", "node2", "node3"));
+
+    ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl newOp =
+        new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(op, (byte) 1, removedNodes);
+
+    int numberOfParts = 13;
+    assertEquals(numberOfParts, newOp.getMessage().getNumberOfParts());
+    for (int i = 0; i < numberOfParts; i++) {
+      assertNotNull(newOp.getMessage().getPart(i));
+    }
+    assertNull(newOp.getMessage().getPart(numberOfParts));
+  }
+
+  private ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl createOpWithFunctionTwoFilters() {
+    String region = "testRegion";
+    String functionId = "testFunctionId";
+    Function function = mock(Function.class);
+    ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class);
+    Set filter = new HashSet(Arrays.asList("one", "two"));
+    when(serverRegionExecutor.getFilter()).thenReturn(filter);
+    byte functionState = (byte) 1;
+    byte flags = (byte) 2;
+    ResultCollector resultCollector = mock(ResultCollector.class);
+    int timeoutMs = 100;
+
+    ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op =
+        new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(region, function,
+            serverRegionExecutor, resultCollector,
+            timeoutMs);
+    return op;
+  }
+
+  private ExecuteRegionFunctionOpImpl createOpWithFunctionIdOneFilter() {
+    String region = "testRegion";
+    String functionId = "testFunctionId";
+    Function function = null;
+    ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class);
+    Set filter = new HashSet(Arrays.asList("one"));
+    when(serverRegionExecutor.getFilter()).thenReturn(filter);
+    byte functionState = (byte) 1;
+    byte flags = (byte) 1;
+    byte hasResult = (byte) 1;
+    boolean isHA = false;
+    ResultCollector resultCollector = mock(ResultCollector.class);
+    int timeoutMs = 100;
+    boolean optimizeForWrite = true;
+    boolean isReexecute = false;
+
+    ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, functionId,
+        serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite,
+        true, timeoutMs);
+    return op;
+  }
+
+}