You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:16 UTC

[11/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
new file mode 100755
index 0000000..deefb4c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
@@ -0,0 +1,644 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Executes the function on server (possibly without region/cache).<br> 
+ * Also gets the result back from the server
+ * @author Suranjan Kumar
+ * @since 5.8
+ */
+
+public class ExecuteFunctionOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  /** index of allMembers in flags[] */
+  public static final int ALL_MEMBERS_INDEX = 0;
+  /** index of ignoreFailedMembers in flags[] */
+  public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
+
+  private ExecuteFunctionOp() {
+    // no instances allowed
+  }
+  
+  /**
+   * Does a execute Function on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param function of the function to be executed
+   * @param args specified arguments to the application function
+   */
+  public static void execute(final PoolImpl pool, Function function,
+      ServerFunctionExecutor executor, Object args,
+      MemberMappedArgument memberMappedArg, boolean allServers, byte hasResult,
+      ResultCollector rc, boolean isFnSerializationReqd,
+      UserAttributes attributes, String[] groups) {
+    final AbstractOp op = new ExecuteFunctionOpImpl(function, args,
+        memberMappedArg, hasResult, rc, isFnSerializationReqd, (byte)0, groups, allServers, executor.isIgnoreDepartedMembers());
+
+    if (allServers && groups.length == 0) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to all servers using pool: {}", op.getMessage(), pool);
+      }
+      List callableTasks = constructAndGetFunctionTasks(pool, function, args,
+          memberMappedArg, hasResult, rc, isFnSerializationReqd, attributes);
+
+      SingleHopClientExecutor.submitAll(callableTasks);
+    } else {
+      boolean reexecuteForServ = false;
+      AbstractOp reexecOp = null;
+      int retryAttempts = 0;
+      boolean reexecute = false;
+      int maxRetryAttempts = 0;
+      if(function.isHA())
+        maxRetryAttempts = pool.getRetryAttempts();
+      
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      do {
+        try {
+          if (reexecuteForServ) {
+            if (isDebugEnabled) {
+              logger.debug("ExecuteFunctionOp#execute.reexecuteForServ : Sending Function Execution Message:{} to server using pool: {} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+            }
+            reexecOp = new ExecuteFunctionOpImpl(function, args,
+                memberMappedArg, hasResult, rc, isFnSerializationReqd,
+                (byte)1/* isReExecute */, groups, allServers, executor.isIgnoreDepartedMembers());
+            pool.execute(reexecOp, 0);
+          } else {
+            if (isDebugEnabled) {
+              logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to server using pool: {} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+            }
+
+            pool.execute(op, 0);
+          }
+          reexecute = false;
+          reexecuteForServ = false;
+        } catch (InternalFunctionInvocationTargetException e) {
+          if (isDebugEnabled) {
+            logger.debug("ExecuteFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed node is {}", e.getFailedNodeSet());
+          }
+          reexecute = true;
+          rc.clearResults();
+        } catch (ServerConnectivityException se) {
+          retryAttempts++;
+
+          if (isDebugEnabled) {
+            logger.debug("ExecuteFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryAttempt is : {} maxRetryAttempts  {}", se, retryAttempts, maxRetryAttempts);
+          }
+          if (se instanceof ServerOperationException) {
+            throw se;
+          }
+          if ((retryAttempts > maxRetryAttempts && maxRetryAttempts != -1))
+            throw se;
+
+          reexecuteForServ = true;
+          rc.clearResults();
+        }
+      } while (reexecuteForServ);
+
+      if (reexecute && function.isHA()) {
+        ExecuteFunctionOp.reexecute(pool, function,
+            executor, rc, hasResult, isFnSerializationReqd, maxRetryAttempts - 1, groups, allServers);
+      }
+    }
+  }
+  
+  public static void execute(final PoolImpl pool, String functionId,
+      ServerFunctionExecutor executor, Object args,
+      MemberMappedArgument memberMappedArg, boolean allServers, byte hasResult,
+      ResultCollector rc, boolean isFnSerializationReqd, boolean isHA,
+      boolean optimizeForWrite, UserAttributes properties, String[] groups) {
+    final AbstractOp op = new ExecuteFunctionOpImpl(functionId, args,
+        memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+        optimizeForWrite, (byte)0, groups, allServers, executor.isIgnoreDepartedMembers());
+    if (allServers && groups.length == 0) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to all servers using pool: {}", op.getMessage(), pool);
+      }
+      List callableTasks = constructAndGetFunctionTasks(pool, functionId, args,
+          memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+          optimizeForWrite, properties);
+
+      SingleHopClientExecutor.submitAll(callableTasks);
+    } else {
+      boolean reexecuteForServ = false;
+      AbstractOp reexecOp = null;
+      int retryAttempts = 0;
+      boolean reexecute = false;
+      int maxRetryAttempts = 0;
+      if(isHA){
+        maxRetryAttempts = pool.getRetryAttempts();
+      }
+      
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      do {
+        try {
+          if (reexecuteForServ) {
+            reexecOp = new ExecuteFunctionOpImpl(functionId, args,
+                memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+                optimizeForWrite, (byte)1, groups, allServers, executor.isIgnoreDepartedMembers());
+            pool.execute(reexecOp, 0);
+          } else {
+            if (isDebugEnabled) {
+              logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+            }
+            pool.execute(op, 0);
+          }
+          reexecute = false;
+          reexecuteForServ = false;
+        } catch (InternalFunctionInvocationTargetException e) {
+          if (isDebugEnabled) {
+            logger.debug("ExecuteFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed node is {}", e.getFailedNodeSet());
+          }
+          reexecute = true;
+          rc.clearResults();
+        } catch (ServerConnectivityException se) {
+          retryAttempts++;
+
+          if (isDebugEnabled) {
+            logger.debug("ExecuteFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryAttempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+          }
+          if (se instanceof ServerOperationException) {
+            throw se;
+          }
+          if ((retryAttempts > maxRetryAttempts && maxRetryAttempts != -1))
+            throw se;
+
+          reexecuteForServ = true;
+          rc.clearResults();
+        }
+      } while (reexecuteForServ);
+
+      if (reexecute && isHA) {
+        ExecuteFunctionOp.reexecute(pool, functionId, executor, rc, hasResult,
+            isFnSerializationReqd, maxRetryAttempts - 1, args, isHA,
+            optimizeForWrite, groups, allServers);
+      }
+    }
+  }
+  
+  public static void reexecute(ExecutablePool pool, Function function,
+      ServerFunctionExecutor serverExecutor, ResultCollector resultCollector,
+      byte hasResult, boolean isFnSerializationReqd, int maxRetryAttempts, String[] groups, boolean allMembers) {
+    boolean reexecute = true;
+    int retryAttempts = 0;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    do {
+      reexecute = false;
+      AbstractOp reExecuteOp = new ExecuteFunctionOpImpl(function, serverExecutor.getArguments(),
+          serverExecutor.getMemberMappedArgument(), hasResult, resultCollector, isFnSerializationReqd, (byte)1, groups, allMembers, serverExecutor.isIgnoreDepartedMembers());
+      if (isDebugEnabled) {
+        logger.debug("ExecuteFunction#reexecute : Sending Function Execution Message:{} to Server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", reExecuteOp.getMessage(), pool, Arrays.toString(groups), allMembers, serverExecutor.isIgnoreDepartedMembers());
+      }
+      try {
+          pool.execute(reExecuteOp,0);
+      }
+      catch (InternalFunctionInvocationTargetException e) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteFunctionOp#reexecute : Recieved InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+        }
+        reexecute = true;
+        resultCollector.clearResults();
+      }
+      catch (ServerConnectivityException se) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteFunctionOp#reexecute : Received ServerConnectivity Exception.");
+        }
+        
+        if(se instanceof ServerOperationException){
+          throw se;
+        }
+        retryAttempts++;
+        if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2) 
+          throw se;
+
+        reexecute = true;
+        resultCollector.clearResults();
+      }
+    } while (reexecute);
+  }
+  
+  public static void reexecute(ExecutablePool pool, String functionId,
+      ServerFunctionExecutor serverExecutor, ResultCollector resultCollector,
+      byte hasResult, boolean isFnSerializationReqd, int maxRetryAttempts,
+      Object args, boolean isHA, boolean optimizeForWrite, String[] groups, boolean allMembers) {
+    boolean reexecute = true;
+    int retryAttempts = 0;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    do {
+      reexecute = false;
+      
+      final AbstractOp op = new ExecuteFunctionOpImpl(functionId, args,
+          serverExecutor.getMemberMappedArgument(), hasResult, resultCollector, isFnSerializationReqd, isHA, optimizeForWrite, (byte)1, groups, allMembers, serverExecutor.isIgnoreDepartedMembers());
+      
+      if (isDebugEnabled) {
+        logger.debug("ExecuteFunction#reexecute : Sending Function Execution Message:{} to Server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allMembers, serverExecutor.isIgnoreDepartedMembers());
+      }
+      try {
+          pool.execute(op,0);
+      }
+      catch (InternalFunctionInvocationTargetException e) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteFunctionOp#reexecute : Recieved InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+        }
+        reexecute = true;
+        resultCollector.clearResults();
+      }
+      catch (ServerConnectivityException se) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteFunctionOp#reexecute : Received ServerConnectivity Exception.");
+        }
+        
+        if(se instanceof ServerOperationException){
+          throw se;
+        }
+        retryAttempts++;
+        if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2) 
+          throw se;
+
+        reexecute = true;
+        resultCollector.clearResults();
+      }
+    } while (reexecute);
+  }
+
+  static List constructAndGetFunctionTasks(final PoolImpl pool,
+      final Function function, Object args,
+      MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc,
+      boolean isFnSerializationReqd, UserAttributes attributes) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = null;
+    if (pool.getLocators() == null || pool.getLocators().isEmpty()) { 
+      servers = ((ExplicitConnectionSourceImpl)pool.getConnectionSource())
+          .getAllServers();
+    }
+    else {
+      servers = ((AutoConnectionSourceImpl)pool.getConnectionSource())
+          .findAllServers(); // n/w call on locator
+    }
+
+    for (ServerLocation server : servers) {
+      final AbstractOp op = new ExecuteFunctionOpImpl(function,
+          args, memberMappedArg, hasResult, rc, isFnSerializationReqd, (byte)0, null/*onGroups does not use single-hop for now*/, false, false);
+      SingleHopOperationCallable task = new SingleHopOperationCallable(server, pool, op, attributes);
+      tasks.add(task);
+    }
+    return tasks;
+  }
+      
+  static List constructAndGetFunctionTasks(final PoolImpl pool,
+      final String functionId, Object args,
+      MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc,
+      boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, UserAttributes properties) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = null;
+    if (pool.getLocators() == null || pool.getLocators().isEmpty()) { 
+      servers = ((ExplicitConnectionSourceImpl)pool.getConnectionSource())
+          .getAllServers();
+    }
+    else {
+      servers = ((AutoConnectionSourceImpl)pool.getConnectionSource())
+          .findAllServers(); // n/w call on locator
+    }
+
+    for (ServerLocation server : servers) {
+      final AbstractOp op = new ExecuteFunctionOpImpl(functionId,
+          args, memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte)0,null/*onGroups does not use single-hop for now*/, false, false);
+      SingleHopOperationCallable task = new SingleHopOperationCallable(server, pool, op, properties);
+      tasks.add(task);
+    }
+    return tasks;
+  }
+  
+  static byte[] getByteArrayForFlags(boolean... flags) {
+    byte[] retVal = null;
+    if (flags.length > 0) {
+      retVal = new byte[flags.length];
+      for (int i=0; i<flags.length; i++) {
+        if (flags[i]) {
+          retVal[i] = 1;
+        } else {
+          retVal[i] = 0;
+        }
+      }
+    }
+    return retVal;
+  }
+
+  static class ExecuteFunctionOpImpl extends AbstractOp {
+
+    private ResultCollector resultCollector;
+    
+   //To get the instance of the Function Statistics we need the function name or instance
+    private String functionId;
+
+    private Function function;
+
+    private Object args;
+
+    private MemberMappedArgument memberMappedArg;
+
+    private byte hasResult;
+
+    private boolean isFnSerializationReqd;
+
+    private String[] groups;
+
+    /**
+     * [0] = allMembers
+     * [1] = ignoreFailedMembers
+     */
+    private byte[] flags;
+
+    /**
+     * number of parts in the request message
+     */
+    private static final int MSG_PARTS = 6;
+
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public ExecuteFunctionOpImpl(Function function, Object args,
+        MemberMappedArgument memberMappedArg, byte hasResult,
+        ResultCollector rc, boolean isFnSerializationReqd, byte isReexecute, String[] groups, boolean allMembers, boolean ignoreFailedMembers) {
+      super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+      byte fnState = AbstractExecution.getFunctionState(function.isHA(),
+          function.hasResult(), function.optimizeForWrite());
+
+      addBytes(isReexecute, fnState);
+      if(isFnSerializationReqd){
+        getMessage().addStringOrObjPart(function); 
+      }
+      else{
+        getMessage().addStringOrObjPart(function.getId()); 
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addObjPart(groups);
+      this.flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
+      getMessage().addBytesPart(this.flags);
+      resultCollector = rc;
+      if(isReexecute == 1) {
+        resultCollector.clearResults();
+      }
+      this.functionId = function.getId();
+      this.function = function;
+      this.args = args;
+      this.memberMappedArg = memberMappedArg;
+      this.hasResult = fnState;
+      this.isFnSerializationReqd = isFnSerializationReqd;
+      this.groups = groups;
+    }
+
+    public ExecuteFunctionOpImpl(String functionId, Object args2,
+        MemberMappedArgument memberMappedArg, byte hasResult,
+        ResultCollector rc, boolean isFnSerializationReqd, boolean isHA,
+        boolean optimizeForWrite, byte isReexecute, String[] groups, boolean allMembers, boolean ignoreFailedMembers) {
+      super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+      byte fnState = AbstractExecution.getFunctionState(isHA,
+          hasResult == (byte)1 ? true : false, optimizeForWrite);
+
+      addBytes(isReexecute, fnState);
+      getMessage().addStringOrObjPart(functionId);
+      getMessage().addObjPart(args2);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addObjPart(groups);
+      this.flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
+      getMessage().addBytesPart(this.flags);
+      resultCollector = rc;
+      if(isReexecute == 1) {
+        resultCollector.clearResults();
+      } 
+      this.functionId = functionId;
+      this.args = args2;
+      this.memberMappedArg = memberMappedArg;
+      this.hasResult = fnState;
+      this.isFnSerializationReqd = isFnSerializationReqd;
+      this.groups = groups;
+    }
+
+    public ExecuteFunctionOpImpl(ExecuteFunctionOpImpl op, byte isReexecute) {
+      super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+      this.resultCollector = op.resultCollector;
+      this.function = op.function;
+      this.functionId = op.functionId;
+      this.hasResult = op.hasResult;
+      this.args = op.args;
+      this.memberMappedArg = op.memberMappedArg;
+      this.isFnSerializationReqd = op.isFnSerializationReqd;
+      this.groups = op.groups;
+      this.flags = op.flags;
+      
+      addBytes(isReexecute, this.hasResult);
+      if(this.isFnSerializationReqd){
+        getMessage().addStringOrObjPart(function); 
+      }
+      else{
+        getMessage().addStringOrObjPart(function.getId()); 
+      }
+      getMessage().addObjPart(this.args);
+      getMessage().addObjPart(this.memberMappedArg);
+      getMessage().addObjPart(this.groups);
+      getMessage().addBytesPart(this.flags);
+      if(isReexecute == 1) {
+        resultCollector.clearResults();
+      }
+    }
+
+    private void addBytes(byte isReexecute, byte fnStateOrHasResult) {
+      if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+        if (isReexecute == 1) {
+          getMessage().addBytesPart(new byte[] { AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult) });
+        } else {
+          getMessage().addBytesPart(new byte[] { fnStateOrHasResult });
+        }
+      } else {
+        byte[] bytes = new byte[5];
+        if (isReexecute == 1) {
+          bytes[0] = AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult);
+        } else {
+          bytes[0] = fnStateOrHasResult;
+        }
+        Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+        getMessage().addBytesPart(bytes);
+      }
+    }
+
+    /**
+     * ignoreFaileMember flag is at index 1
+     */
+    private boolean getIgnoreFailedMembers() {
+      boolean ignoreFailedMembers = false;
+      if (this.flags != null && this.flags.length > 1) {
+        if (this.flags[IGNORE_FAILED_MEMBERS_INDEX] == 1) {
+          ignoreFailedMembers = true;
+        }
+      }
+      return ignoreFailedMembers;
+    }
+
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {      
+      ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+      try {
+        // Read the header which describes the type of message following
+        executeFunctionResponseMsg.readHeader();
+        switch (executeFunctionResponseMsg.getMessageType()) {
+          case MessageType.EXECUTE_FUNCTION_RESULT:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_RESULT.");
+            }
+            // Read the chunk
+            do{
+              executeFunctionResponseMsg.receiveChunk();
+              Object resultResponse = executeFunctionResponseMsg.getPart(0)
+                  .getObject();
+              Object result;
+              if (resultResponse instanceof ArrayList) {
+                result = ((ArrayList)resultResponse).get(0);
+              }
+              else {
+                result = resultResponse;
+              }
+              if (result instanceof FunctionException) {
+                //String s = "While performing a remote " + getOpName();
+                FunctionException ex = ((FunctionException)result);
+                if (ex instanceof InternalFunctionException || getIgnoreFailedMembers()) {
+                  Throwable cause = ex.getCause() == null ? ex : ex.getCause();
+                  DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                      .get(1);
+                  this.resultCollector.addResult(memberID, cause);
+                  FunctionStats.getFunctionStats(this.functionId, null)
+                      .incResultsReceived();
+                  continue;
+                }
+                else {
+                  throw ex;
+                }
+              }else if (result instanceof Throwable) {
+                String s = "While performing a remote " + getOpName();
+                throw new ServerOperationException(s, (Throwable)result);
+                // Get the exception toString part.
+                // This was added for c++ thin client and not used in java
+                //Part exceptionToStringPart = msg.getPart(1);
+              }
+              else {
+                DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                    .get(1);
+                synchronized (resultCollector) {
+                  resultCollector.addResult(memberID, result);                    
+                }
+                FunctionStats.getFunctionStats(this.functionId, null)
+                    .incResultsReceived();
+              }
+            }while(!executeFunctionResponseMsg.isLastChunk());
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteFunctionOpImpl#processResponse: received all the results from server successfully.");
+            }
+            return null;          
+          case MessageType.EXCEPTION:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXCEPTION");
+            }
+            // Read the chunk
+            executeFunctionResponseMsg.receiveChunk();
+            Part part0 = executeFunctionResponseMsg.getPart(0);
+            Object obj = part0.getObject();
+            if (obj instanceof FunctionException) {
+              FunctionException ex = ((FunctionException)obj);
+              throw ex;
+            }
+            else {
+              String s = ": While performing a remote execute Function" + ((Throwable)obj).getMessage();
+              throw new ServerOperationException(s, (Throwable)obj);              
+            }
+          case MessageType.EXECUTE_FUNCTION_ERROR:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_ERROR");
+            }
+            // Read the chunk
+            executeFunctionResponseMsg.receiveChunk();
+            String errorMessage = executeFunctionResponseMsg.getPart(0)
+                .getString();
+            throw new ServerOperationException(errorMessage);
+          default:
+            throw new InternalGemFireError(
+                LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+                  .toLocalizedString(
+                     Integer.valueOf(executeFunctionResponseMsg.getMessageType())));
+        }
+      }
+      finally {
+        executeFunctionResponseMsg.clear();
+      }      
+    }
+
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXECUTE_FUNCTION_ERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startExecuteFunction();
+    }
+    
+    protected String getOpName() {
+      return "executeFunction";
+    }
+
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunctionSend(start, hasFailed());
+    }
+
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+  }
+  
+  public static final int MAX_FE_THREADS = Integer.getInteger(
+      "DistributionManager.MAX_FE_THREADS",
+      Math.max(Runtime.getRuntime().availableProcessors() * 4, 16)).intValue();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
new file mode 100755
index 0000000..76840ce
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
@@ -0,0 +1,218 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Does a Execution of function on server region
+ * It does not get the resul from the server (follows Fire&Forget approch)
+ * @author Kishor Bachhav
+ * @since 5.8Beta
+ */
+public class ExecuteRegionFunctionNoAckOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private ExecuteRegionFunctionNoAckOp() {
+    // no instances allowed
+  }
+
+  /**
+   * Does a execute Function on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the put on
+   * @param function to be executed
+   * @param serverRegionExecutor which will return argument and filter
+   */
+  public static void execute(ExecutablePool pool, String region,
+      Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+      byte hasResult) {
+    AbstractOp op = new ExecuteRegionFunctionNoAckOpImpl(region, function,
+        serverRegionExecutor, hasResult);
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteRegionFunctionNoAckOp#execute : Sending Function Execution Message: {} to Server using pool: {}", op.getMessage(), pool);
+      }
+      pool.execute(op);
+    }
+    catch (Exception ex) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteRegionFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message: {} to server using pool: {}", op.getMessage(), pool, ex);
+      }
+      if (ex.getMessage() != null)
+        throw new FunctionException(ex.getMessage(), ex);
+      else
+        throw new FunctionException(
+            "Unexpected exception during function execution:", ex);
+    }
+  }
+
+  public static void execute(ExecutablePool pool, String region,
+      String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+      byte hasResult, boolean isHA, boolean optimizeForWrite) {
+    AbstractOp op = new ExecuteRegionFunctionNoAckOpImpl(region, functionId,
+        serverRegionExecutor, hasResult, isHA, optimizeForWrite);
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteRegionFunctionNoAckOp#execute : Sending Function Execution Message: {} to Server using pool: {}", op.getMessage(), pool);
+      }
+      pool.execute(op);
+    }
+    catch (Exception ex) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteRegionFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message: {} to server using pool: {}", op.getMessage(), pool, ex);
+      }
+      if (ex.getMessage() != null)
+        throw new FunctionException(ex.getMessage(), ex);
+      else
+        throw new FunctionException(
+            "Unexpected exception during function execution:", ex);
+    }
+  }
+  
+  private static class ExecuteRegionFunctionNoAckOpImpl extends AbstractOp {
+    private final boolean executeOnBucketSet ;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public ExecuteRegionFunctionNoAckOpImpl(String region, Function function,
+        ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult) {
+      super(MessageType.EXECUTE_REGION_FUNCTION, 8 + serverRegionExecutor
+          .getFilter().size());
+      byte isReExecute = 0;
+      int removedNodesSize = 0;
+      byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+          function.hasResult(), function.optimizeForWrite());
+      Set routingObjects = serverRegionExecutor.getFilter();
+      Object args = serverRegionExecutor.getArguments();
+      MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
+      getMessage().addBytesPart(new byte[]{functionState});
+      getMessage().addStringPart(region);
+      if(serverRegionExecutor.isFnSerializationReqd()){
+        getMessage().addStringOrObjPart(function); 
+      }
+      else{
+        getMessage().addStringOrObjPart(function.getId()); 
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      
+      this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); 
+//      byte flags = this.executeOnBucketSet ?
+//          (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+//      flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+      getMessage().addBytesPart(new byte[]{flags});
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        getMessage().addStringOrObjPart(key);
+      }
+      getMessage().addIntPart(removedNodesSize);
+    }
+
+    public ExecuteRegionFunctionNoAckOpImpl(String region, String functionId,
+        ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean isHA, boolean optimizeForWrite) {
+      super(MessageType.EXECUTE_REGION_FUNCTION, 8 + serverRegionExecutor
+          .getFilter().size());
+      byte isReExecute = 0;
+      int removedNodesSize = 0;
+      byte functionState = AbstractExecution.getFunctionState(isHA,
+          hasResult==(byte)1?true:false, optimizeForWrite);
+      
+      Set routingObjects = serverRegionExecutor.getFilter();
+      Object args = serverRegionExecutor.getArguments();
+      MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
+      getMessage().addBytesPart(new byte[]{functionState});
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(functionId); 
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); 
+//      byte flags = this.executeOnBucketSet ?
+//          (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+//      flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+      getMessage().addBytesPart(new byte[]{flags});
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        getMessage().addStringOrObjPart(key);
+      }
+      getMessage().addIntPart(removedNodesSize);
+    }
+    
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.REPLY) {
+      return null;
+    }
+      else {
+        Part part = msg.getPart(0);
+        if (msgType == MessageType.EXCEPTION) {
+          Throwable t = (Throwable)part.getObject();
+          logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION), t);
+        }
+        else if (isErrorResponse(msgType)) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION)); // TODO:LOG:FIXED: used to include part.getString()); but it wouldn't have been printed
+        }
+        else {
+          throw new InternalGemFireError("Unexpected message type "
+              + MessageType.getString(msgType));
+        }
+        return null;
+      }
+    
+    }
+
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startExecuteFunction();
+    }
+
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunctionSend(start, hasFailed());
+    }
+
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override  
+    protected Message createResponseMessage() {
+      return new Message(1, Version.CURRENT);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
new file mode 100755
index 0000000..199e11a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -0,0 +1,615 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a Execution of function on server region.<br>
+ * It alos gets result from the server
+ * @author Kishor Bachhav
+ * @since 5.8Beta
+ */
+public class ExecuteRegionFunctionOp {
+
+  private static final Logger logger = LogService.getLogger();
+  private ExecuteRegionFunctionOp() {
+    // no instances allowed
+  }
+
+  /**
+   * Does a execute Function on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the put on
+   * @param function to be executed
+   * @param serverRegionExecutor which will return argument and filter
+   * @param resultCollector is used to collect the results from the Server
+   */
+  public static void execute(ExecutablePool pool, String region,
+      Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, int mRetryAttempts) {
+    AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+        resultCollector, hasResult, new HashSet<String>());
+
+    int retryAttempts = 0;
+    boolean reexecute = false;
+    boolean reexecuteForServ = false;
+    Set<String> failedNodes = new HashSet<String>();
+    AbstractOp reexecOp = null; 
+    int maxRetryAttempts = 0;
+    if (function.isHA()) {
+      maxRetryAttempts = mRetryAttempts;
+    }
+    
+    final boolean isDebugEnabled =logger.isDebugEnabled();
+    do {
+    try {
+        if (reexecuteForServ) {
+          reexecOp = new ExecuteRegionFunctionOpImpl(
+              (ExecuteRegionFunctionOpImpl)op, (byte)1/* isReExecute */,
+              failedNodes);
+          pool.execute(reexecOp, 0);
+        }
+        else {
+          pool.execute(op, 0);
+        }
+        reexecute = false;
+        reexecuteForServ = false;
+      }
+    catch (InternalFunctionInvocationTargetException e) {
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+      }
+      reexecute = true;
+      resultCollector.clearResults();
+      Set<String> failedNodesIds = e.getFailedNodeSet();
+      failedNodes.clear();
+      if (failedNodesIds != null) {
+        failedNodes.addAll(failedNodesIds);
+      }
+    }
+    catch (ServerConnectivityException se) {
+      retryAttempts++;
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts );
+      }
+      if(se instanceof ServerOperationException){
+        throw se;
+      }
+      if ((retryAttempts > maxRetryAttempts && maxRetryAttempts !=-1) /*|| !function.isHA()*/)
+        throw se;
+
+      reexecuteForServ = true;
+      resultCollector.clearResults();
+      failedNodes.clear();
+    }
+    }
+    while(reexecuteForServ);
+
+    if ( reexecute && function.isHA()) {
+      ExecuteRegionFunctionOp.reexecute(pool, region, function,
+          serverRegionExecutor, resultCollector, hasResult, failedNodes,
+          maxRetryAttempts - 1);
+    }
+  }
+  
+  public static void execute(ExecutablePool pool, String region,
+      String function, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, int mRetryAttempts, boolean isHA, boolean optimizeForWrite) {
+    AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+        resultCollector, hasResult, new HashSet<String>(), isHA, optimizeForWrite, true);
+
+    int retryAttempts = 0;
+    boolean reexecute = false;
+    boolean reexecuteForServ = false;
+    Set<String> failedNodes = new HashSet<String>();
+    AbstractOp reexecOp = null; 
+    int maxRetryAttempts = 0;
+    if (isHA) {
+      maxRetryAttempts = mRetryAttempts;
+    }
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    do{
+    try {
+      if (reexecuteForServ) {
+        reexecOp = new ExecuteRegionFunctionOpImpl(
+            (ExecuteRegionFunctionOpImpl)op, (byte)1/* isReExecute */,
+            failedNodes);
+        pool.execute(reexecOp, 0);
+      }
+      else {
+        pool.execute(op, 0);
+      }
+      reexecute = false;
+      reexecuteForServ = false;
+    }
+    catch (InternalFunctionInvocationTargetException e) {
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+      }
+      reexecute = true;
+      resultCollector.clearResults();
+      Set<String> failedNodesIds = e.getFailedNodeSet();
+      failedNodes.clear();
+      if (failedNodesIds != null) {
+        failedNodes.addAll(failedNodesIds);
+      }
+    }
+    catch (ServerConnectivityException se) {
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+      }
+      if(se instanceof ServerOperationException){
+        throw se;
+      }
+      retryAttempts++;
+      if ((retryAttempts >  maxRetryAttempts && maxRetryAttempts != -1) /*|| !isHA*/)
+        throw se;
+
+      reexecute = true;
+      resultCollector.clearResults();
+      failedNodes.clear();
+    }
+  }
+  while(reexecuteForServ);
+  
+    if ( reexecute && isHA) {
+      ExecuteRegionFunctionOp.reexecute(pool, region, function,
+          serverRegionExecutor, resultCollector, hasResult, failedNodes,
+          maxRetryAttempts - 1, isHA, optimizeForWrite);
+    }
+  }
+  
+  public static void reexecute(ExecutablePool pool, String region,
+      Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, Set<String> failedNodes, int maxRetryAttempts) {
+    AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+        resultCollector, hasResult, new HashSet<String>());
+
+    boolean reexecute = true;
+    int retryAttempts = 0;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    do {
+      reexecute = false;
+     AbstractOp reExecuteOp = new ExecuteRegionFunctionOpImpl(
+          (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, failedNodes);
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunction#reexecute: Sending Function Execution Message: {} to Server using pool: {} with failed nodes: {}", reExecuteOp.getMessage(), pool, failedNodes);
+      }
+      try {
+          pool.execute(reExecuteOp,0);
+      }
+      catch (InternalFunctionInvocationTargetException e) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteRegionFunctionOp#reexecute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+        }
+        reexecute = true;
+        resultCollector.clearResults();
+        Set<String> failedNodesIds = e.getFailedNodeSet();
+        failedNodes.clear();
+        if (failedNodesIds != null) {
+          failedNodes.addAll(failedNodesIds);
+        }
+      }
+      catch (ServerConnectivityException se) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteRegionFunctionOp#reexecute : Received ServerConnectivity Exception.");
+        }
+        
+        if(se instanceof ServerOperationException){
+          throw se;
+        }
+        retryAttempts++;
+        if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2) 
+          throw se;
+
+        reexecute = true;
+        resultCollector.clearResults();
+        failedNodes.clear();
+      }
+    } while (reexecute);
+  }
+  
+  public static void reexecute(ExecutablePool pool, String region,
+      String function, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, Set<String> failedNodes, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite) {
+    AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+        resultCollector, hasResult, new HashSet<String>(), isHA, optimizeForWrite, true);
+
+    boolean reexecute = true;
+    int retryAttempts = 0;
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    do {
+      reexecute = false;
+      
+      AbstractOp reExecuteOp = new ExecuteRegionFunctionOpImpl(
+          (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, failedNodes);
+      if (isDebugEnabled) {
+        logger.debug("ExecuteRegionFunction#reexecute : Sending Function Execution Message: {} to Server using pool: {}", reExecuteOp.getMessage(), pool);
+      }
+      try {
+          pool.execute(reExecuteOp,0);
+      }
+      catch (InternalFunctionInvocationTargetException e) {
+        if (isDebugEnabled) {
+          logger.debug("ExecuteRegionFunctionOp#reexecute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+        }
+        reexecute = true;
+        resultCollector.clearResults();
+        Set<String> failedNodesIds = e.getFailedNodeSet();
+        failedNodes.clear();
+        if (failedNodesIds != null) {
+          failedNodes.addAll(failedNodesIds);
+        }
+      }
+      catch (ServerConnectivityException se) {
+        if (isDebugEnabled) { 
+          logger.debug("ExecuteRegionFunctionOp#reexecute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+        }
+        if(se instanceof ServerOperationException){
+          throw se;
+        }
+        retryAttempts++;
+        if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2) 
+          throw se;
+
+        reexecute = true;
+        resultCollector.clearResults();
+        failedNodes.clear();
+      }
+    } while (reexecute);
+  }
+  static class ExecuteRegionFunctionOpImpl extends AbstractOp {
+
+    // To collect the results from the server
+    private final ResultCollector resultCollector;
+
+    //To get the instance of the Function Statistics we need the function name or instance
+    private Function function;
+
+    private byte isReExecute = 0;
+
+    private final String regionName;
+
+    private final ServerRegionFunctionExecutor executor;
+
+    private final byte hasResult;
+
+    private Set<String> failedNodes = new HashSet<String>();
+
+    private final String functionId;
+    private final boolean executeOnBucketSet;
+
+    /**
+     * @param removedNodes TODO
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public ExecuteRegionFunctionOpImpl(String region, Function function,
+        ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+        byte hasResult, Set<String> removedNodes) {
+      super(MessageType.EXECUTE_REGION_FUNCTION, 8
+          + serverRegionExecutor.getFilter().size() + removedNodes.size());
+      Set routingObjects = serverRegionExecutor.getFilter();
+      Object args = serverRegionExecutor.getArguments();
+      byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+          function.hasResult(), function.optimizeForWrite());
+      MemberMappedArgument memberMappedArg = serverRegionExecutor
+          .getMemberMappedArgument();
+
+      addBytes(functionState);
+      getMessage().addStringPart(region);
+      if (serverRegionExecutor.isFnSerializationReqd()) {
+        getMessage().addStringOrObjPart(function);
+      }
+      else {
+        getMessage().addStringOrObjPart(function.getId());
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); 
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+      getMessage().addBytesPart(new byte[] {flags});
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        getMessage().addStringOrObjPart(key);
+      }
+      getMessage().addIntPart(removedNodes.size());
+      for (Object nodes : removedNodes) {
+        getMessage().addStringOrObjPart(nodes);
+      }
+
+      this.resultCollector = rc;
+      this.regionName = region;
+      this.function = function;
+      this.functionId = function.getId();
+      this.executor = serverRegionExecutor;
+      this.hasResult = functionState;
+      this.failedNodes = removedNodes;
+    }
+    
+    public ExecuteRegionFunctionOpImpl(String region, String function,
+        ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+        byte hasResult, Set<String> removedNodes, boolean isHA, boolean optimizeForWrite, boolean calculateFnState ) {
+      super(MessageType.EXECUTE_REGION_FUNCTION, 8
+          + serverRegionExecutor.getFilter().size() + removedNodes.size());
+      Set routingObjects = serverRegionExecutor.getFilter();
+      byte functionState = hasResult;
+      if(calculateFnState){
+         functionState = AbstractExecution.getFunctionState(isHA,
+          hasResult == (byte)1 ? true : false, optimizeForWrite);
+      }
+      Object args = serverRegionExecutor.getArguments();
+      MemberMappedArgument memberMappedArg = serverRegionExecutor
+          .getMemberMappedArgument();
+      addBytes(functionState);
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(function);
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      
+      this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); 
+//      byte flags = this.executeOnBucketSet ? 
+//          (byte)(0x00 | ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) : 0x00;
+//      flags = isReExecute == 1? (byte)(flags | ExecuteFunctionHelper.IS_REXECUTE_MASK) : flags;
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+      
+      getMessage().addBytesPart(new byte[] { flags });
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        getMessage().addStringOrObjPart(key);
+      }
+      getMessage().addIntPart(removedNodes.size());
+      for (Object nodes : removedNodes) {
+        getMessage().addStringOrObjPart(nodes);
+      }
+
+      this.resultCollector = rc;
+      this.regionName = region;
+      this.functionId = function;
+      this.executor = serverRegionExecutor;
+      this.hasResult = functionState;
+      this.failedNodes = removedNodes;
+    }
+
+    public ExecuteRegionFunctionOpImpl(
+        ExecuteRegionFunctionSingleHopOpImpl newop) {
+      this(newop.getRegionName(), newop.getFunctionId(), newop.getExecutor(),
+          newop.getResultCollector(), newop.getHasResult(),
+          new HashSet<String>(), newop.isHA(), newop
+              .optimizeForWrite(), false);
+    }
+
+    public ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionOpImpl op,
+        byte isReExecute, Set<String> removedNodes) {
+      super(MessageType.EXECUTE_REGION_FUNCTION, 8
+          + op.executor.getFilter().size() + removedNodes.size());
+      this.isReExecute = isReExecute;
+      this.resultCollector = op.resultCollector;
+      this.function = op.function;
+      this.functionId = op.functionId;
+      this.regionName = op.regionName;
+      this.executor = op.executor;
+      this.hasResult = op.hasResult;
+      this.failedNodes = op.failedNodes;
+      this.executeOnBucketSet = op.executeOnBucketSet;
+      if (isReExecute == 1) {
+        this.resultCollector.endResults();
+        this.resultCollector.clearResults();
+      }
+
+      Set routingObjects = executor.getFilter();
+      Object args = executor.getArguments();
+      MemberMappedArgument memberMappedArg = executor.getMemberMappedArgument();
+      getMessage().clear();
+      addBytes(this.hasResult);
+      getMessage().addStringPart(this.regionName);
+      if (executor.isFnSerializationReqd()) {
+        getMessage().addStringOrObjPart(function);
+      }
+      else {
+        getMessage().addStringOrObjPart(functionId);
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+//      byte flags = this.executeOnBucketSet ?
+//          (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+//      flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+      byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+      getMessage().addBytesPart(new byte[] {flags });
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        getMessage().addStringOrObjPart(key);
+      }
+      getMessage().addIntPart(removedNodes.size());
+      for (Object nodes : removedNodes) {
+        getMessage().addStringOrObjPart(nodes);
+      }
+    }
+
+    private void addBytes(byte functionStateOrHasResult) {
+      if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+        getMessage().addBytesPart(new byte[] { functionStateOrHasResult });
+      } else {
+        byte[] bytes = new byte[5];
+        bytes[0] = functionStateOrHasResult;
+        Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+        getMessage().addBytesPart(bytes);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+      // Read the header which describes the type of message following
+      try {
+        executeFunctionResponseMsg.readHeader();
+        switch (executeFunctionResponseMsg.getMessageType()) {
+          case MessageType.EXECUTE_REGION_FUNCTION_RESULT:
+            final boolean isDebugEnabled = logger.isDebugEnabled();
+            if (isDebugEnabled) {
+              logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_RESULT. The number of parts are : {}", executeFunctionResponseMsg.getNumberOfParts());
+            }
+            // Read the chunk
+            do {
+              executeFunctionResponseMsg.receiveChunk();
+              Object resultResponse = executeFunctionResponseMsg.getPart(0)
+                  .getObject();
+              Object result;
+              if (resultResponse instanceof ArrayList) {
+                result = ((ArrayList)resultResponse).get(0);
+              }
+              else {
+                result = resultResponse;
+              }
+              if (result instanceof FunctionException) {
+                FunctionException ex = ((FunctionException)result);
+                if (ex instanceof InternalFunctionException) {
+                  Throwable cause = ex.getCause();
+                  DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                      .get(1);
+                  this.resultCollector
+                      .addResult(memberID, cause);
+                  FunctionStats.getFunctionStats(this.functionId,
+                      this.executor.getRegion().getSystem())
+                      .incResultsReceived();
+                  continue;
+                }
+                else if (((FunctionException)result).getCause() instanceof InternalFunctionInvocationTargetException) {
+                  InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+                      .getCause();
+                  this.failedNodes.addAll(ifite.getFailedNodeSet());
+                }
+                executeFunctionResponseMsg.clear();
+                throw ex;
+              }
+              else if (result instanceof Throwable) {
+                String s = "While performing a remote " + getOpName();
+                executeFunctionResponseMsg.clear();
+                throw new ServerOperationException(s, (Throwable)result);
+              }
+              else {
+                DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                    .get(1);
+                this.resultCollector.addResult(memberID, result);
+                FunctionStats.getFunctionStats(this.functionId,
+                    this.executor.getRegion().getSystem()).incResultsReceived();
+              }
+            } while (!executeFunctionResponseMsg.isLastChunk());
+            if (isDebugEnabled) {
+              logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received all the results from server successfully.");
+            }
+            this.resultCollector.endResults();
+            return null;
+
+          case MessageType.EXCEPTION:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXCEPTION. The number of parts are : {}", executeFunctionResponseMsg.getNumberOfParts());
+            }
+
+            // Read the chunk
+            executeFunctionResponseMsg.receiveChunk();
+            Part part0 = executeFunctionResponseMsg.getPart(0);
+            Object obj = part0.getObject();
+            if (obj instanceof FunctionException) {
+              FunctionException ex = ((FunctionException)obj);
+              if (((FunctionException)obj).getCause() instanceof InternalFunctionInvocationTargetException) {
+                InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+                    .getCause();
+                this.failedNodes.addAll(ifite.getFailedNodeSet());
+              }
+              executeFunctionResponseMsg.clear();
+              throw ex;
+            }
+            else if (obj instanceof Throwable) {
+              executeFunctionResponseMsg.clear();
+              String s = "While performing a remote " + getOpName();
+              throw new ServerOperationException(s, (Throwable)obj);
+            }
+            break;
+          case MessageType.EXECUTE_REGION_FUNCTION_ERROR:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_ERROR");
+            }
+            // Read the chunk
+            executeFunctionResponseMsg.receiveChunk();
+            String errorMessage = executeFunctionResponseMsg.getPart(0)
+                .getString();
+            executeFunctionResponseMsg.clear();
+            throw new ServerOperationException(errorMessage);
+          default:
+            throw new InternalGemFireError("Unknown message type "
+                + executeFunctionResponseMsg.getMessageType());
+        }
+      }
+      finally {
+        executeFunctionResponseMsg.clear();
+      }
+      return null;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startExecuteFunction();
+    }
+
+    protected String getOpName() {
+      return "executeRegionFunction";
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunctionSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
new file mode 100644
index 0000000..5ea03f8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -0,0 +1,491 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * @author skumar
+ * @since 6.5
+ */
+public class ExecuteRegionFunctionSingleHopOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private ExecuteRegionFunctionSingleHopOp() {
+  }
+
+  public static void execute(ExecutablePool pool, Region region,
+      Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult,
+      Map<ServerLocation, ? extends HashSet> serverToFilterMap, int mRetryAttempts, boolean allBuckets) {
+
+    boolean reexecute = false;
+    Set<String> failedNodes = new HashSet<String>();
+    int maxRetryAttempts = 0;
+    if (function.isHA()) {
+      maxRetryAttempts = mRetryAttempts;
+    }
+    ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
+        .getClientMetadataService();
+
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
+    }
+    List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
+        serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, function,
+        hasResult, resultCollector, cms, allBuckets);
+
+    reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
+        (LocalRegion)region, resultCollector, failedNodes);
+
+    if (isDebugEnabled) {
+      logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is : {}", callableTasks.size());
+    }
+
+    if (reexecute ) {
+      resultCollector.clearResults();
+      if(function.isHA()) {
+      ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(), function,
+          serverRegionExecutor, resultCollector, hasResult, failedNodes,
+          maxRetryAttempts - 1);
+      }
+      else {
+        ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), function,
+            serverRegionExecutor, resultCollector, hasResult,
+            maxRetryAttempts - 1);
+      }
+    }
+
+    resultCollector.endResults();
+  }
+
+  public static void execute(ExecutablePool pool, Region region,
+      String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult,
+      Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+      int mRetryAttempts, boolean allBuckets, boolean isHA,
+      boolean optimizeForWrite) {
+
+    boolean reexecute = false;
+    Set<String> failedNodes = new HashSet<String>();
+    int maxRetryAttempts = 0;
+    if (isHA) {
+      maxRetryAttempts = mRetryAttempts;
+    }
+    ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
+        .getClientMetadataService();
+
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
+    }
+    List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
+        serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, functionId,
+        hasResult, resultCollector, cms, allBuckets, isHA,optimizeForWrite);
+
+    reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
+        (LocalRegion)region, resultCollector, failedNodes);
+
+    if (isDebugEnabled) {
+      logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is: {}, reexecute={}", callableTasks.size(), reexecute);
+    }
+
+    if (reexecute) {
+      resultCollector.clearResults();
+      if (isHA) {
+        ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(),
+            functionId, serverRegionExecutor, resultCollector, hasResult,
+            failedNodes, maxRetryAttempts - 1, isHA, optimizeForWrite);
+      }
+      else {
+        ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), functionId,
+            serverRegionExecutor, resultCollector, hasResult,
+            maxRetryAttempts - 1, isHA, optimizeForWrite);
+      }
+    }
+
+    resultCollector.endResults();
+  }
+
+  
+  static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
+      ServerRegionFunctionExecutor serverRegionExecutor,
+      final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+      final PoolImpl pool, final Function function, byte hasResult,
+      ResultCollector rc, ClientMetadataService cms, boolean allBucket) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+        serverToFilterMap.keySet());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Constructing tasks for the servers {}", servers);
+    }
+    for (ServerLocation server : servers) {
+      ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
+          .withFilter(serverToFilterMap.get(server));
+
+      AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, function, executor, rc, hasResult, new HashSet<String>(),
+          allBucket);
+      SingleHopOperationCallable task = new SingleHopOperationCallable(
+          new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
+      tasks.add(task);
+    }
+    return tasks;
+  }
+
+  static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
+      ServerRegionFunctionExecutor serverRegionExecutor,
+      final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+      final PoolImpl pool, final String functionId, byte hasResult,
+      ResultCollector rc, ClientMetadataService cms, boolean allBucket, boolean isHA, boolean optimizeForWrite) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+        serverToFilterMap.keySet());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Constructing tasks for the servers {}", servers);
+    }
+    for (ServerLocation server : servers) {
+      ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
+          .withFilter(serverToFilterMap.get(server));
+
+      AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, functionId, executor, rc, hasResult, new HashSet<String>(),
+          allBucket, isHA, optimizeForWrite);
+      SingleHopOperationCallable task = new SingleHopOperationCallable(
+          new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
+      tasks.add(task);
+    }
+    return tasks;
+  }
+  
+  static class ExecuteRegionFunctionSingleHopOpImpl extends AbstractOp {
+
+    private final ResultCollector resultCollector;
+
+    private final String functionId;
+
+    private final String regionName;
+
+    private final ServerRegionFunctionExecutor executor;
+
+    private final byte hasResult;
+
+    private Set<String> failedNodes = new HashSet<String>();
+
+    private boolean isHA; 
+    
+    private boolean optimizeForWrite;
+
+    public ExecuteRegionFunctionSingleHopOpImpl(String region,
+        Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+        ResultCollector rc, byte hasResult,
+        Set<String> removedNodes, boolean allBuckets) {
+      // What is this 8 that is getting added to filter and removednode sizes?
+      // It should have been used as a constant and documented
+      super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+          + serverRegionExecutor.getFilter().size() + removedNodes.size());
+      this.isHA = function.isHA();
+      this.optimizeForWrite = function.optimizeForWrite();
+      byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+          function.hasResult() , function.optimizeForWrite());
+      Set routingObjects = serverRegionExecutor.getFilter();
+      Object args = serverRegionExecutor.getArguments();
+      MemberMappedArgument memberMappedArg = serverRegionExecutor
+          .getMemberMappedArgument();
+      addBytes(functionState);
+      getMessage().addStringPart(region);
+      if (serverRegionExecutor.isFnSerializationReqd()) {
+        getMessage().addStringOrObjPart(function);
+      }
+      else {
+        getMessage().addStringOrObjPart(function.getId());
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        if(allBuckets){
+          getMessage().addIntPart((Integer)key);
+        }
+        else {
+          getMessage().addStringOrObjPart(key);
+        }
+      }
+      getMessage().addIntPart(removedNodes.size());
+      for (Object nodes : removedNodes) {
+        getMessage().addStringOrObjPart(nodes);
+      }
+
+      this.resultCollector = rc;
+      this.regionName = region;
+      this.functionId = function.getId();
+      this.executor = serverRegionExecutor;
+      this.hasResult = functionState;
+      this.failedNodes = removedNodes;
+    }
+
+    public ExecuteRegionFunctionSingleHopOpImpl(String region,
+        String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+        ResultCollector rc, byte hasResult,
+        Set<String> removedNodes, boolean allBuckets, boolean isHA, boolean optimizeForWrite) {
+      // What is this 8 that is getting added to filter and removednode sizes?
+      // It should have been used as a constant and documented
+      super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+          + serverRegionExecutor.getFilter().size() + removedNodes.size());
+      this.isHA = isHA;
+      this.optimizeForWrite = optimizeForWrite;
+      Set routingObjects = serverRegionExecutor.getFilter();
+      Object args = serverRegionExecutor.getArguments();
+      byte functionState = AbstractExecution.getFunctionState(isHA,
+          hasResult == (byte)1 ? true : false, optimizeForWrite);
+      MemberMappedArgument memberMappedArg = serverRegionExecutor
+          .getMemberMappedArgument();
+      addBytes(functionState);
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(functionId);
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
+      getMessage().addIntPart(routingObjects.size());
+      for (Object key : routingObjects) {
+        if(allBuckets){
+          getMessage().addIntPart((Integer)key);
+        }
+        else {
+          getMessage().addStringOrObjPart(key);
+        }
+      }
+      getMessage().addIntPart(removedNodes.size());
+      for (Object nodes : removedNodes) {
+        getMessage().addStringOrObjPart(nodes);
+      }
+
+      this.resultCollector = rc;
+      this.regionName = region;
+      this.functionId = functionId;
+      this.executor = serverRegionExecutor;
+      this.hasResult = functionState;
+      this.failedNodes = removedNodes;
+    }
+
+    private void addBytes(byte functionState) {
+      if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+        getMessage().addBytesPart(new byte[] { functionState });
+      } else {
+        byte[] bytes = new byte[5];
+        bytes[0] = functionState;
+        Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+        getMessage().addBytesPart(bytes);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+      try {
+        executeFunctionResponseMsg.readHeader();
+        switch (executeFunctionResponseMsg.getMessageType()) {
+          case MessageType.EXECUTE_REGION_FUNCTION_RESULT:
+            final boolean isDebugEnabled = logger.isDebugEnabled();
+            if (isDebugEnabled) {
+              logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_RESULT.");
+            }
+            do {
+              executeFunctionResponseMsg.receiveChunk();
+              Object resultResponse = executeFunctionResponseMsg.getPart(0)
+                  .getObject();
+              Object result;
+              if (resultResponse instanceof ArrayList) {
+                result = ((ArrayList)resultResponse).get(0);
+              }
+              else {
+                result = resultResponse;
+              }
+
+              if (result instanceof FunctionException) {
+                FunctionException ex = ((FunctionException)result);
+                if (isDebugEnabled) {
+                  logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received Exception. {}", ex.getCause());
+                }
+                if (ex instanceof InternalFunctionException) {
+                  Throwable cause = ex.getCause();
+                  DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                      .get(1);
+                  this.resultCollector
+                      .addResult(memberID, cause);
+                  FunctionStats.getFunctionStats(this.functionId,
+                      this.executor.getRegion().getSystem())
+                      .incResultsReceived();
+                  continue;
+                }
+                else if (((FunctionException)result).getCause() instanceof InternalFunctionInvocationTargetException) {
+                  InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+                      .getCause();
+                  this.failedNodes.addAll(ifite.getFailedNodeSet());
+                }
+                if (!ex.getMessage().equals("Buckets are null"))
+                  throw ex;
+
+                return null;
+              }
+              else if (result instanceof Throwable) {
+                String s = "While performing a remote " + getOpName();
+                throw new ServerOperationException(s, (Throwable)result);
+              }
+              else {
+                DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+                    .get(1);
+                synchronized (this.resultCollector) {
+                  this.resultCollector
+                      .addResult(memberID, result);
+                }
+                FunctionStats.getFunctionStats(this.functionId,
+                    this.executor.getRegion().getSystem()).incResultsReceived();
+              }
+            } while (!executeFunctionResponseMsg.isLastChunk());
+            if (isDebugEnabled) {
+              logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received all the results from server successfully.");
+            }
+            return null;
+            
+          case MessageType.EXCEPTION:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXCEPTION");
+            }
+            executeFunctionResponseMsg.receiveChunk();
+            Part part0 = executeFunctionResponseMsg.getPart(0);
+            Object obj = part0.getObject();
+
+            if (obj instanceof FunctionException) {
+              FunctionException ex = ((FunctionException)obj);
+              if (((FunctionException)obj).getCause() instanceof InternalFunctionInvocationTargetException) {
+                InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+                    .getCause();
+                this.failedNodes.addAll(ifite.getFailedNodeSet());
+              }
+              if (!ex.getMessage().equals("Buckets are null")) {
+                throw ex;
+              }
+              return null;
+            }
+            else if (obj instanceof Throwable) {
+              String s = "While performing a remote " + getOpName();
+              throw new ServerOperationException(s, (Throwable)obj);
+            }
+            break;
+          case MessageType.EXECUTE_REGION_FUNCTION_ERROR:
+            if (logger.isDebugEnabled()) {
+              logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_ERROR");
+            }
+            executeFunctionResponseMsg.receiveChunk();
+            String errorMessage = executeFunctionResponseMsg.getPart(0)
+                .getString();
+            throw new ServerOperationException(errorMessage);
+            
+          default:
+            throw new InternalGemFireError("Unknown message type "
+                + executeFunctionResponseMsg.getMessageType());
+        }
+      }
+      finally {
+        executeFunctionResponseMsg.clear();
+      }
+      return null;
+    }
+
+    ResultCollector getResultCollector() {
+      return this.resultCollector;
+    }
+
+    String getFunctionId() {
+      return this.functionId;
+    }
+
+    String getRegionName() {
+      return this.regionName;
+    }
+
+    ServerRegionFunctionExecutor getExecutor() {
+      return this.executor;
+    }
+
+    byte getHasResult() {
+      return this.hasResult;
+    }
+
+    boolean isHA() {
+      return this.isHA;
+    }
+
+     boolean optimizeForWrite() {
+      return this.optimizeForWrite;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startExecuteFunction();
+    }
+
+    protected String getOpName() {
+      return "executeRegionFunctionSingleHop";
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunctionSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+
+  }
+}