You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:16 UTC
[11/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
new file mode 100755
index 0000000..deefb4c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
@@ -0,0 +1,644 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Executes the function on server (possibly without region/cache).<br>
+ * Also gets the result back from the server
+ * @author Suranjan Kumar
+ * @since 5.8
+ */
+
+public class ExecuteFunctionOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ /** index of allMembers in flags[] */
+ public static final int ALL_MEMBERS_INDEX = 0;
+ /** index of ignoreFailedMembers in flags[] */
+ public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
+
+ private ExecuteFunctionOp() {
+ // no instances allowed
+ }
+
+ /**
+ * Does a execute Function on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param function of the function to be executed
+ * @param args specified arguments to the application function
+ */
+ public static void execute(final PoolImpl pool, Function function,
+ ServerFunctionExecutor executor, Object args,
+ MemberMappedArgument memberMappedArg, boolean allServers, byte hasResult,
+ ResultCollector rc, boolean isFnSerializationReqd,
+ UserAttributes attributes, String[] groups) {
+ final AbstractOp op = new ExecuteFunctionOpImpl(function, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd, (byte)0, groups, allServers, executor.isIgnoreDepartedMembers());
+
+ if (allServers && groups.length == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to all servers using pool: {}", op.getMessage(), pool);
+ }
+ List callableTasks = constructAndGetFunctionTasks(pool, function, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd, attributes);
+
+ SingleHopClientExecutor.submitAll(callableTasks);
+ } else {
+ boolean reexecuteForServ = false;
+ AbstractOp reexecOp = null;
+ int retryAttempts = 0;
+ boolean reexecute = false;
+ int maxRetryAttempts = 0;
+ if(function.isHA())
+ maxRetryAttempts = pool.getRetryAttempts();
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ try {
+ if (reexecuteForServ) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute.reexecuteForServ : Sending Function Execution Message:{} to server using pool: {} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+ }
+ reexecOp = new ExecuteFunctionOpImpl(function, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd,
+ (byte)1/* isReExecute */, groups, allServers, executor.isIgnoreDepartedMembers());
+ pool.execute(reexecOp, 0);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to server using pool: {} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+ }
+
+ pool.execute(op, 0);
+ }
+ reexecute = false;
+ reexecuteForServ = false;
+ } catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed node is {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ rc.clearResults();
+ } catch (ServerConnectivityException se) {
+ retryAttempts++;
+
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryAttempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+ }
+ if (se instanceof ServerOperationException) {
+ throw se;
+ }
+ if ((retryAttempts > maxRetryAttempts && maxRetryAttempts != -1))
+ throw se;
+
+ reexecuteForServ = true;
+ rc.clearResults();
+ }
+ } while (reexecuteForServ);
+
+ if (reexecute && function.isHA()) {
+ ExecuteFunctionOp.reexecute(pool, function,
+ executor, rc, hasResult, isFnSerializationReqd, maxRetryAttempts - 1, groups, allServers);
+ }
+ }
+ }
+
+ public static void execute(final PoolImpl pool, String functionId,
+ ServerFunctionExecutor executor, Object args,
+ MemberMappedArgument memberMappedArg, boolean allServers, byte hasResult,
+ ResultCollector rc, boolean isFnSerializationReqd, boolean isHA,
+ boolean optimizeForWrite, UserAttributes properties, String[] groups) {
+ final AbstractOp op = new ExecuteFunctionOpImpl(functionId, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+ optimizeForWrite, (byte)0, groups, allServers, executor.isIgnoreDepartedMembers());
+ if (allServers && groups.length == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to all servers using pool: {}", op.getMessage(), pool);
+ }
+ List callableTasks = constructAndGetFunctionTasks(pool, functionId, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+ optimizeForWrite, properties);
+
+ SingleHopClientExecutor.submitAll(callableTasks);
+ } else {
+ boolean reexecuteForServ = false;
+ AbstractOp reexecOp = null;
+ int retryAttempts = 0;
+ boolean reexecute = false;
+ int maxRetryAttempts = 0;
+ if(isHA){
+ maxRetryAttempts = pool.getRetryAttempts();
+ }
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ try {
+ if (reexecuteForServ) {
+ reexecOp = new ExecuteFunctionOpImpl(functionId, args,
+ memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA,
+ optimizeForWrite, (byte)1, groups, allServers, executor.isIgnoreDepartedMembers());
+ pool.execute(reexecOp, 0);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Sending Function Execution Message:{} to server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allServers, executor.isIgnoreDepartedMembers());
+ }
+ pool.execute(op, 0);
+ }
+ reexecute = false;
+ reexecuteForServ = false;
+ } catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed node is {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ rc.clearResults();
+ } catch (ServerConnectivityException se) {
+ retryAttempts++;
+
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryAttempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+ }
+ if (se instanceof ServerOperationException) {
+ throw se;
+ }
+ if ((retryAttempts > maxRetryAttempts && maxRetryAttempts != -1))
+ throw se;
+
+ reexecuteForServ = true;
+ rc.clearResults();
+ }
+ } while (reexecuteForServ);
+
+ if (reexecute && isHA) {
+ ExecuteFunctionOp.reexecute(pool, functionId, executor, rc, hasResult,
+ isFnSerializationReqd, maxRetryAttempts - 1, args, isHA,
+ optimizeForWrite, groups, allServers);
+ }
+ }
+ }
+
+ public static void reexecute(ExecutablePool pool, Function function,
+ ServerFunctionExecutor serverExecutor, ResultCollector resultCollector,
+ byte hasResult, boolean isFnSerializationReqd, int maxRetryAttempts, String[] groups, boolean allMembers) {
+ boolean reexecute = true;
+ int retryAttempts = 0;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ reexecute = false;
+ AbstractOp reExecuteOp = new ExecuteFunctionOpImpl(function, serverExecutor.getArguments(),
+ serverExecutor.getMemberMappedArgument(), hasResult, resultCollector, isFnSerializationReqd, (byte)1, groups, allMembers, serverExecutor.isIgnoreDepartedMembers());
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunction#reexecute : Sending Function Execution Message:{} to Server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", reExecuteOp.getMessage(), pool, Arrays.toString(groups), allMembers, serverExecutor.isIgnoreDepartedMembers());
+ }
+ try {
+ pool.execute(reExecuteOp,0);
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#reexecute : Recieved InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ }
+ catch (ServerConnectivityException se) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#reexecute : Received ServerConnectivity Exception.");
+ }
+
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ retryAttempts++;
+ if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2)
+ throw se;
+
+ reexecute = true;
+ resultCollector.clearResults();
+ }
+ } while (reexecute);
+ }
+
+ public static void reexecute(ExecutablePool pool, String functionId,
+ ServerFunctionExecutor serverExecutor, ResultCollector resultCollector,
+ byte hasResult, boolean isFnSerializationReqd, int maxRetryAttempts,
+ Object args, boolean isHA, boolean optimizeForWrite, String[] groups, boolean allMembers) {
+ boolean reexecute = true;
+ int retryAttempts = 0;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ reexecute = false;
+
+ final AbstractOp op = new ExecuteFunctionOpImpl(functionId, args,
+ serverExecutor.getMemberMappedArgument(), hasResult, resultCollector, isFnSerializationReqd, isHA, optimizeForWrite, (byte)1, groups, allMembers, serverExecutor.isIgnoreDepartedMembers());
+
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunction#reexecute : Sending Function Execution Message:{} to Server using pool:{} with groups:{} all members:{} ignoreFailedMembers:{}", op.getMessage(), pool, Arrays.toString(groups), allMembers, serverExecutor.isIgnoreDepartedMembers());
+ }
+ try {
+ pool.execute(op,0);
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#reexecute : Recieved InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ }
+ catch (ServerConnectivityException se) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteFunctionOp#reexecute : Received ServerConnectivity Exception.");
+ }
+
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ retryAttempts++;
+ if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2)
+ throw se;
+
+ reexecute = true;
+ resultCollector.clearResults();
+ }
+ } while (reexecute);
+ }
+
+ static List constructAndGetFunctionTasks(final PoolImpl pool,
+ final Function function, Object args,
+ MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc,
+ boolean isFnSerializationReqd, UserAttributes attributes) {
+ final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+ ArrayList<ServerLocation> servers = null;
+ if (pool.getLocators() == null || pool.getLocators().isEmpty()) {
+ servers = ((ExplicitConnectionSourceImpl)pool.getConnectionSource())
+ .getAllServers();
+ }
+ else {
+ servers = ((AutoConnectionSourceImpl)pool.getConnectionSource())
+ .findAllServers(); // n/w call on locator
+ }
+
+ for (ServerLocation server : servers) {
+ final AbstractOp op = new ExecuteFunctionOpImpl(function,
+ args, memberMappedArg, hasResult, rc, isFnSerializationReqd, (byte)0, null/*onGroups does not use single-hop for now*/, false, false);
+ SingleHopOperationCallable task = new SingleHopOperationCallable(server, pool, op, attributes);
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ static List constructAndGetFunctionTasks(final PoolImpl pool,
+ final String functionId, Object args,
+ MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc,
+ boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, UserAttributes properties) {
+ final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+ ArrayList<ServerLocation> servers = null;
+ if (pool.getLocators() == null || pool.getLocators().isEmpty()) {
+ servers = ((ExplicitConnectionSourceImpl)pool.getConnectionSource())
+ .getAllServers();
+ }
+ else {
+ servers = ((AutoConnectionSourceImpl)pool.getConnectionSource())
+ .findAllServers(); // n/w call on locator
+ }
+
+ for (ServerLocation server : servers) {
+ final AbstractOp op = new ExecuteFunctionOpImpl(functionId,
+ args, memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte)0,null/*onGroups does not use single-hop for now*/, false, false);
+ SingleHopOperationCallable task = new SingleHopOperationCallable(server, pool, op, properties);
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ static byte[] getByteArrayForFlags(boolean... flags) {
+ byte[] retVal = null;
+ if (flags.length > 0) {
+ retVal = new byte[flags.length];
+ for (int i=0; i<flags.length; i++) {
+ if (flags[i]) {
+ retVal[i] = 1;
+ } else {
+ retVal[i] = 0;
+ }
+ }
+ }
+ return retVal;
+ }
+
+ static class ExecuteFunctionOpImpl extends AbstractOp {
+
+ private ResultCollector resultCollector;
+
+ //To get the instance of the Function Statistics we need the function name or instance
+ private String functionId;
+
+ private Function function;
+
+ private Object args;
+
+ private MemberMappedArgument memberMappedArg;
+
+ private byte hasResult;
+
+ private boolean isFnSerializationReqd;
+
+ private String[] groups;
+
+ /**
+ * [0] = allMembers
+ * [1] = ignoreFailedMembers
+ */
+ private byte[] flags;
+
+ /**
+ * number of parts in the request message
+ */
+ private static final int MSG_PARTS = 6;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ExecuteFunctionOpImpl(Function function, Object args,
+ MemberMappedArgument memberMappedArg, byte hasResult,
+ ResultCollector rc, boolean isFnSerializationReqd, byte isReexecute, String[] groups, boolean allMembers, boolean ignoreFailedMembers) {
+ super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+ byte fnState = AbstractExecution.getFunctionState(function.isHA(),
+ function.hasResult(), function.optimizeForWrite());
+
+ addBytes(isReexecute, fnState);
+ if(isFnSerializationReqd){
+ getMessage().addStringOrObjPart(function);
+ }
+ else{
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addObjPart(groups);
+ this.flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
+ getMessage().addBytesPart(this.flags);
+ resultCollector = rc;
+ if(isReexecute == 1) {
+ resultCollector.clearResults();
+ }
+ this.functionId = function.getId();
+ this.function = function;
+ this.args = args;
+ this.memberMappedArg = memberMappedArg;
+ this.hasResult = fnState;
+ this.isFnSerializationReqd = isFnSerializationReqd;
+ this.groups = groups;
+ }
+
+ public ExecuteFunctionOpImpl(String functionId, Object args2,
+ MemberMappedArgument memberMappedArg, byte hasResult,
+ ResultCollector rc, boolean isFnSerializationReqd, boolean isHA,
+ boolean optimizeForWrite, byte isReexecute, String[] groups, boolean allMembers, boolean ignoreFailedMembers) {
+ super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+ byte fnState = AbstractExecution.getFunctionState(isHA,
+ hasResult == (byte)1 ? true : false, optimizeForWrite);
+
+ addBytes(isReexecute, fnState);
+ getMessage().addStringOrObjPart(functionId);
+ getMessage().addObjPart(args2);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addObjPart(groups);
+ this.flags = getByteArrayForFlags(allMembers, ignoreFailedMembers);
+ getMessage().addBytesPart(this.flags);
+ resultCollector = rc;
+ if(isReexecute == 1) {
+ resultCollector.clearResults();
+ }
+ this.functionId = functionId;
+ this.args = args2;
+ this.memberMappedArg = memberMappedArg;
+ this.hasResult = fnState;
+ this.isFnSerializationReqd = isFnSerializationReqd;
+ this.groups = groups;
+ }
+
+ public ExecuteFunctionOpImpl(ExecuteFunctionOpImpl op, byte isReexecute) {
+ super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+ this.resultCollector = op.resultCollector;
+ this.function = op.function;
+ this.functionId = op.functionId;
+ this.hasResult = op.hasResult;
+ this.args = op.args;
+ this.memberMappedArg = op.memberMappedArg;
+ this.isFnSerializationReqd = op.isFnSerializationReqd;
+ this.groups = op.groups;
+ this.flags = op.flags;
+
+ addBytes(isReexecute, this.hasResult);
+ if(this.isFnSerializationReqd){
+ getMessage().addStringOrObjPart(function);
+ }
+ else{
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(this.args);
+ getMessage().addObjPart(this.memberMappedArg);
+ getMessage().addObjPart(this.groups);
+ getMessage().addBytesPart(this.flags);
+ if(isReexecute == 1) {
+ resultCollector.clearResults();
+ }
+ }
+
+ private void addBytes(byte isReexecute, byte fnStateOrHasResult) {
+ if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+ if (isReexecute == 1) {
+ getMessage().addBytesPart(new byte[] { AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult) });
+ } else {
+ getMessage().addBytesPart(new byte[] { fnStateOrHasResult });
+ }
+ } else {
+ byte[] bytes = new byte[5];
+ if (isReexecute == 1) {
+ bytes[0] = AbstractExecution.getReexecuteFunctionState(fnStateOrHasResult);
+ } else {
+ bytes[0] = fnStateOrHasResult;
+ }
+ Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+ getMessage().addBytesPart(bytes);
+ }
+ }
+
+ /**
+ * ignoreFaileMember flag is at index 1
+ */
+ private boolean getIgnoreFailedMembers() {
+ boolean ignoreFailedMembers = false;
+ if (this.flags != null && this.flags.length > 1) {
+ if (this.flags[IGNORE_FAILED_MEMBERS_INDEX] == 1) {
+ ignoreFailedMembers = true;
+ }
+ }
+ return ignoreFailedMembers;
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+ try {
+ // Read the header which describes the type of message following
+ executeFunctionResponseMsg.readHeader();
+ switch (executeFunctionResponseMsg.getMessageType()) {
+ case MessageType.EXECUTE_FUNCTION_RESULT:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_RESULT.");
+ }
+ // Read the chunk
+ do{
+ executeFunctionResponseMsg.receiveChunk();
+ Object resultResponse = executeFunctionResponseMsg.getPart(0)
+ .getObject();
+ Object result;
+ if (resultResponse instanceof ArrayList) {
+ result = ((ArrayList)resultResponse).get(0);
+ }
+ else {
+ result = resultResponse;
+ }
+ if (result instanceof FunctionException) {
+ //String s = "While performing a remote " + getOpName();
+ FunctionException ex = ((FunctionException)result);
+ if (ex instanceof InternalFunctionException || getIgnoreFailedMembers()) {
+ Throwable cause = ex.getCause() == null ? ex : ex.getCause();
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ this.resultCollector.addResult(memberID, cause);
+ FunctionStats.getFunctionStats(this.functionId, null)
+ .incResultsReceived();
+ continue;
+ }
+ else {
+ throw ex;
+ }
+ }else if (result instanceof Throwable) {
+ String s = "While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable)result);
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ //Part exceptionToStringPart = msg.getPart(1);
+ }
+ else {
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ synchronized (resultCollector) {
+ resultCollector.addResult(memberID, result);
+ }
+ FunctionStats.getFunctionStats(this.functionId, null)
+ .incResultsReceived();
+ }
+ }while(!executeFunctionResponseMsg.isLastChunk());
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOpImpl#processResponse: received all the results from server successfully.");
+ }
+ return null;
+ case MessageType.EXCEPTION:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXCEPTION");
+ }
+ // Read the chunk
+ executeFunctionResponseMsg.receiveChunk();
+ Part part0 = executeFunctionResponseMsg.getPart(0);
+ Object obj = part0.getObject();
+ if (obj instanceof FunctionException) {
+ FunctionException ex = ((FunctionException)obj);
+ throw ex;
+ }
+ else {
+ String s = ": While performing a remote execute Function" + ((Throwable)obj).getMessage();
+ throw new ServerOperationException(s, (Throwable)obj);
+ }
+ case MessageType.EXECUTE_FUNCTION_ERROR:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionOpImpl#processResponse: received message of type EXECUTE_FUNCTION_ERROR");
+ }
+ // Read the chunk
+ executeFunctionResponseMsg.receiveChunk();
+ String errorMessage = executeFunctionResponseMsg.getPart(0)
+ .getString();
+ throw new ServerOperationException(errorMessage);
+ default:
+ throw new InternalGemFireError(
+ LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+ .toLocalizedString(
+ Integer.valueOf(executeFunctionResponseMsg.getMessageType())));
+ }
+ }
+ finally {
+ executeFunctionResponseMsg.clear();
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXECUTE_FUNCTION_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startExecuteFunction();
+ }
+
+ protected String getOpName() {
+ return "executeFunction";
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunctionSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(1, Version.CURRENT);
+ }
+ }
+
+ public static final int MAX_FE_THREADS = Integer.getInteger(
+ "DistributionManager.MAX_FE_THREADS",
+ Math.max(Runtime.getRuntime().availableProcessors() * 4, 16)).intValue();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
new file mode 100755
index 0000000..76840ce
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionNoAckOp.java
@@ -0,0 +1,218 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Does a Execution of function on server region
+ * It does not get the resul from the server (follows Fire&Forget approch)
+ * @author Kishor Bachhav
+ * @since 5.8Beta
+ */
+public class ExecuteRegionFunctionNoAckOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private ExecuteRegionFunctionNoAckOp() {
+ // no instances allowed
+ }
+
+ /**
+ * Does a execute Function on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the put on
+ * @param function to be executed
+ * @param serverRegionExecutor which will return argument and filter
+ */
+ public static void execute(ExecutablePool pool, String region,
+ Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+ byte hasResult) {
+ AbstractOp op = new ExecuteRegionFunctionNoAckOpImpl(region, function,
+ serverRegionExecutor, hasResult);
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionNoAckOp#execute : Sending Function Execution Message: {} to Server using pool: {}", op.getMessage(), pool);
+ }
+ pool.execute(op);
+ }
+ catch (Exception ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message: {} to server using pool: {}", op.getMessage(), pool, ex);
+ }
+ if (ex.getMessage() != null)
+ throw new FunctionException(ex.getMessage(), ex);
+ else
+ throw new FunctionException(
+ "Unexpected exception during function execution:", ex);
+ }
+ }
+
+ public static void execute(ExecutablePool pool, String region,
+ String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+ byte hasResult, boolean isHA, boolean optimizeForWrite) {
+ AbstractOp op = new ExecuteRegionFunctionNoAckOpImpl(region, functionId,
+ serverRegionExecutor, hasResult, isHA, optimizeForWrite);
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionNoAckOp#execute : Sending Function Execution Message: {} to Server using pool: {}", op.getMessage(), pool);
+ }
+ pool.execute(op);
+ }
+ catch (Exception ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message: {} to server using pool: {}", op.getMessage(), pool, ex);
+ }
+ if (ex.getMessage() != null)
+ throw new FunctionException(ex.getMessage(), ex);
+ else
+ throw new FunctionException(
+ "Unexpected exception during function execution:", ex);
+ }
+ }
+
+ private static class ExecuteRegionFunctionNoAckOpImpl extends AbstractOp {
+ private final boolean executeOnBucketSet ;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ExecuteRegionFunctionNoAckOpImpl(String region, Function function,
+ ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult) {
+ super(MessageType.EXECUTE_REGION_FUNCTION, 8 + serverRegionExecutor
+ .getFilter().size());
+ byte isReExecute = 0;
+ int removedNodesSize = 0;
+ byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+ function.hasResult(), function.optimizeForWrite());
+ Set routingObjects = serverRegionExecutor.getFilter();
+ Object args = serverRegionExecutor.getArguments();
+ MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
+ getMessage().addBytesPart(new byte[]{functionState});
+ getMessage().addStringPart(region);
+ if(serverRegionExecutor.isFnSerializationReqd()){
+ getMessage().addStringOrObjPart(function);
+ }
+ else{
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+
+ this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+// byte flags = this.executeOnBucketSet ?
+// (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+// flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+ getMessage().addBytesPart(new byte[]{flags});
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ getMessage().addStringOrObjPart(key);
+ }
+ getMessage().addIntPart(removedNodesSize);
+ }
+
+ public ExecuteRegionFunctionNoAckOpImpl(String region, String functionId,
+ ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean isHA, boolean optimizeForWrite) {
+ super(MessageType.EXECUTE_REGION_FUNCTION, 8 + serverRegionExecutor
+ .getFilter().size());
+ byte isReExecute = 0;
+ int removedNodesSize = 0;
+ byte functionState = AbstractExecution.getFunctionState(isHA,
+ hasResult==(byte)1?true:false, optimizeForWrite);
+
+ Set routingObjects = serverRegionExecutor.getFilter();
+ Object args = serverRegionExecutor.getArguments();
+ MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
+ getMessage().addBytesPart(new byte[]{functionState});
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(functionId);
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+// byte flags = this.executeOnBucketSet ?
+// (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+// flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+ getMessage().addBytesPart(new byte[]{flags});
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ getMessage().addStringOrObjPart(key);
+ }
+ getMessage().addIntPart(removedNodesSize);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return null;
+ }
+ else {
+ Part part = msg.getPart(0);
+ if (msgType == MessageType.EXCEPTION) {
+ Throwable t = (Throwable)part.getObject();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION), t);
+ }
+ else if (isErrorResponse(msgType)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION)); // TODO:LOG:FIXED: used to include part.getString()); but it wouldn't have been printed
+ }
+ else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ return null;
+ }
+
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startExecuteFunction();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunctionSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new Message(1, Version.CURRENT);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
new file mode 100755
index 0000000..199e11a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -0,0 +1,615 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a Execution of function on server region.<br>
+ * It alos gets result from the server
+ * @author Kishor Bachhav
+ * @since 5.8Beta
+ */
+public class ExecuteRegionFunctionOp {
+
+ private static final Logger logger = LogService.getLogger();
+ private ExecuteRegionFunctionOp() {
+ // no instances allowed
+ }
+
+ /**
+ * Does a execute Function on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the put on
+ * @param function to be executed
+ * @param serverRegionExecutor which will return argument and filter
+ * @param resultCollector is used to collect the results from the Server
+ */
+ public static void execute(ExecutablePool pool, String region,
+ Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult, int mRetryAttempts) {
+ AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+ resultCollector, hasResult, new HashSet<String>());
+
+ int retryAttempts = 0;
+ boolean reexecute = false;
+ boolean reexecuteForServ = false;
+ Set<String> failedNodes = new HashSet<String>();
+ AbstractOp reexecOp = null;
+ int maxRetryAttempts = 0;
+ if (function.isHA()) {
+ maxRetryAttempts = mRetryAttempts;
+ }
+
+ final boolean isDebugEnabled =logger.isDebugEnabled();
+ do {
+ try {
+ if (reexecuteForServ) {
+ reexecOp = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/* isReExecute */,
+ failedNodes);
+ pool.execute(reexecOp, 0);
+ }
+ else {
+ pool.execute(op, 0);
+ }
+ reexecute = false;
+ reexecuteForServ = false;
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ Set<String> failedNodesIds = e.getFailedNodeSet();
+ failedNodes.clear();
+ if (failedNodesIds != null) {
+ failedNodes.addAll(failedNodesIds);
+ }
+ }
+ catch (ServerConnectivityException se) {
+ retryAttempts++;
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts );
+ }
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ if ((retryAttempts > maxRetryAttempts && maxRetryAttempts !=-1) /*|| !function.isHA()*/)
+ throw se;
+
+ reexecuteForServ = true;
+ resultCollector.clearResults();
+ failedNodes.clear();
+ }
+ }
+ while(reexecuteForServ);
+
+ if ( reexecute && function.isHA()) {
+ ExecuteRegionFunctionOp.reexecute(pool, region, function,
+ serverRegionExecutor, resultCollector, hasResult, failedNodes,
+ maxRetryAttempts - 1);
+ }
+ }
+
+ public static void execute(ExecutablePool pool, String region,
+ String function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult, int mRetryAttempts, boolean isHA, boolean optimizeForWrite) {
+ AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+ resultCollector, hasResult, new HashSet<String>(), isHA, optimizeForWrite, true);
+
+ int retryAttempts = 0;
+ boolean reexecute = false;
+ boolean reexecuteForServ = false;
+ Set<String> failedNodes = new HashSet<String>();
+ AbstractOp reexecOp = null;
+ int maxRetryAttempts = 0;
+ if (isHA) {
+ maxRetryAttempts = mRetryAttempts;
+ }
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do{
+ try {
+ if (reexecuteForServ) {
+ reexecOp = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/* isReExecute */,
+ failedNodes);
+ pool.execute(reexecOp, 0);
+ }
+ else {
+ pool.execute(op, 0);
+ }
+ reexecute = false;
+ reexecuteForServ = false;
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#execute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ Set<String> failedNodesIds = e.getFailedNodeSet();
+ failedNodes.clear();
+ if (failedNodesIds != null) {
+ failedNodes.addAll(failedNodesIds);
+ }
+ }
+ catch (ServerConnectivityException se) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#execute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+ }
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ retryAttempts++;
+ if ((retryAttempts > maxRetryAttempts && maxRetryAttempts != -1) /*|| !isHA*/)
+ throw se;
+
+ reexecute = true;
+ resultCollector.clearResults();
+ failedNodes.clear();
+ }
+ }
+ while(reexecuteForServ);
+
+ if ( reexecute && isHA) {
+ ExecuteRegionFunctionOp.reexecute(pool, region, function,
+ serverRegionExecutor, resultCollector, hasResult, failedNodes,
+ maxRetryAttempts - 1, isHA, optimizeForWrite);
+ }
+ }
+
+ public static void reexecute(ExecutablePool pool, String region,
+ Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult, Set<String> failedNodes, int maxRetryAttempts) {
+ AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+ resultCollector, hasResult, new HashSet<String>());
+
+ boolean reexecute = true;
+ int retryAttempts = 0;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ reexecute = false;
+ AbstractOp reExecuteOp = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, failedNodes);
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunction#reexecute: Sending Function Execution Message: {} to Server using pool: {} with failed nodes: {}", reExecuteOp.getMessage(), pool, failedNodes);
+ }
+ try {
+ pool.execute(reExecuteOp,0);
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#reexecute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ Set<String> failedNodesIds = e.getFailedNodeSet();
+ failedNodes.clear();
+ if (failedNodesIds != null) {
+ failedNodes.addAll(failedNodesIds);
+ }
+ }
+ catch (ServerConnectivityException se) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#reexecute : Received ServerConnectivity Exception.");
+ }
+
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ retryAttempts++;
+ if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2)
+ throw se;
+
+ reexecute = true;
+ resultCollector.clearResults();
+ failedNodes.clear();
+ }
+ } while (reexecute);
+ }
+
+ public static void reexecute(ExecutablePool pool, String region,
+ String function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult, Set<String> failedNodes, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite) {
+ AbstractOp op = new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
+ resultCollector, hasResult, new HashSet<String>(), isHA, optimizeForWrite, true);
+
+ boolean reexecute = true;
+ int retryAttempts = 0;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ do {
+ reexecute = false;
+
+ AbstractOp reExecuteOp = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, failedNodes);
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunction#reexecute : Sending Function Execution Message: {} to Server using pool: {}", reExecuteOp.getMessage(), pool);
+ }
+ try {
+ pool.execute(reExecuteOp,0);
+ }
+ catch (InternalFunctionInvocationTargetException e) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#reexecute : Received InternalFunctionInvocationTargetException. The failed nodes are {}", e.getFailedNodeSet());
+ }
+ reexecute = true;
+ resultCollector.clearResults();
+ Set<String> failedNodesIds = e.getFailedNodeSet();
+ failedNodes.clear();
+ if (failedNodesIds != null) {
+ failedNodes.addAll(failedNodesIds);
+ }
+ }
+ catch (ServerConnectivityException se) {
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOp#reexecute : Received ServerConnectivityException. The exception is {} The retryattempt is : {} maxRetryAttempts {}", se, retryAttempts, maxRetryAttempts);
+ }
+ if(se instanceof ServerOperationException){
+ throw se;
+ }
+ retryAttempts++;
+ if (retryAttempts > maxRetryAttempts && maxRetryAttempts != -2)
+ throw se;
+
+ reexecute = true;
+ resultCollector.clearResults();
+ failedNodes.clear();
+ }
+ } while (reexecute);
+ }
+ static class ExecuteRegionFunctionOpImpl extends AbstractOp {
+
+ // To collect the results from the server
+ private final ResultCollector resultCollector;
+
+ //To get the instance of the Function Statistics we need the function name or instance
+ private Function function;
+
+ private byte isReExecute = 0;
+
+ private final String regionName;
+
+ private final ServerRegionFunctionExecutor executor;
+
+ private final byte hasResult;
+
+ private Set<String> failedNodes = new HashSet<String>();
+
+ private final String functionId;
+ private final boolean executeOnBucketSet;
+
+ /**
+ * @param removedNodes TODO
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ExecuteRegionFunctionOpImpl(String region, Function function,
+ ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+ byte hasResult, Set<String> removedNodes) {
+ super(MessageType.EXECUTE_REGION_FUNCTION, 8
+ + serverRegionExecutor.getFilter().size() + removedNodes.size());
+ Set routingObjects = serverRegionExecutor.getFilter();
+ Object args = serverRegionExecutor.getArguments();
+ byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+ function.hasResult(), function.optimizeForWrite());
+ MemberMappedArgument memberMappedArg = serverRegionExecutor
+ .getMemberMappedArgument();
+
+ addBytes(functionState);
+ getMessage().addStringPart(region);
+ if (serverRegionExecutor.isFnSerializationReqd()) {
+ getMessage().addStringOrObjPart(function);
+ }
+ else {
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+ getMessage().addBytesPart(new byte[] {flags});
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ getMessage().addStringOrObjPart(key);
+ }
+ getMessage().addIntPart(removedNodes.size());
+ for (Object nodes : removedNodes) {
+ getMessage().addStringOrObjPart(nodes);
+ }
+
+ this.resultCollector = rc;
+ this.regionName = region;
+ this.function = function;
+ this.functionId = function.getId();
+ this.executor = serverRegionExecutor;
+ this.hasResult = functionState;
+ this.failedNodes = removedNodes;
+ }
+
+ public ExecuteRegionFunctionOpImpl(String region, String function,
+ ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+ byte hasResult, Set<String> removedNodes, boolean isHA, boolean optimizeForWrite, boolean calculateFnState ) {
+ super(MessageType.EXECUTE_REGION_FUNCTION, 8
+ + serverRegionExecutor.getFilter().size() + removedNodes.size());
+ Set routingObjects = serverRegionExecutor.getFilter();
+ byte functionState = hasResult;
+ if(calculateFnState){
+ functionState = AbstractExecution.getFunctionState(isHA,
+ hasResult == (byte)1 ? true : false, optimizeForWrite);
+ }
+ Object args = serverRegionExecutor.getArguments();
+ MemberMappedArgument memberMappedArg = serverRegionExecutor
+ .getMemberMappedArgument();
+ addBytes(functionState);
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(function);
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+
+ this.executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+// byte flags = this.executeOnBucketSet ?
+// (byte)(0x00 | ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) : 0x00;
+// flags = isReExecute == 1? (byte)(flags | ExecuteFunctionHelper.IS_REXECUTE_MASK) : flags;
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+ getMessage().addBytesPart(new byte[] { flags });
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ getMessage().addStringOrObjPart(key);
+ }
+ getMessage().addIntPart(removedNodes.size());
+ for (Object nodes : removedNodes) {
+ getMessage().addStringOrObjPart(nodes);
+ }
+
+ this.resultCollector = rc;
+ this.regionName = region;
+ this.functionId = function;
+ this.executor = serverRegionExecutor;
+ this.hasResult = functionState;
+ this.failedNodes = removedNodes;
+ }
+
+ public ExecuteRegionFunctionOpImpl(
+ ExecuteRegionFunctionSingleHopOpImpl newop) {
+ this(newop.getRegionName(), newop.getFunctionId(), newop.getExecutor(),
+ newop.getResultCollector(), newop.getHasResult(),
+ new HashSet<String>(), newop.isHA(), newop
+ .optimizeForWrite(), false);
+ }
+
+ public ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionOpImpl op,
+ byte isReExecute, Set<String> removedNodes) {
+ super(MessageType.EXECUTE_REGION_FUNCTION, 8
+ + op.executor.getFilter().size() + removedNodes.size());
+ this.isReExecute = isReExecute;
+ this.resultCollector = op.resultCollector;
+ this.function = op.function;
+ this.functionId = op.functionId;
+ this.regionName = op.regionName;
+ this.executor = op.executor;
+ this.hasResult = op.hasResult;
+ this.failedNodes = op.failedNodes;
+ this.executeOnBucketSet = op.executeOnBucketSet;
+ if (isReExecute == 1) {
+ this.resultCollector.endResults();
+ this.resultCollector.clearResults();
+ }
+
+ Set routingObjects = executor.getFilter();
+ Object args = executor.getArguments();
+ MemberMappedArgument memberMappedArg = executor.getMemberMappedArgument();
+ getMessage().clear();
+ addBytes(this.hasResult);
+ getMessage().addStringPart(this.regionName);
+ if (executor.isFnSerializationReqd()) {
+ getMessage().addStringOrObjPart(function);
+ }
+ else {
+ getMessage().addStringOrObjPart(functionId);
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+// byte flags = this.executeOnBucketSet ?
+// (byte)(0x00 | Op.BUCKETS_AS_FILTER_MASK) : 0x00;
+// flags = isReExecute == 1? (byte)(flags | Op.IS_REXECUTE_MASK) : flags;
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+
+ getMessage().addBytesPart(new byte[] {flags });
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ getMessage().addStringOrObjPart(key);
+ }
+ getMessage().addIntPart(removedNodes.size());
+ for (Object nodes : removedNodes) {
+ getMessage().addStringOrObjPart(nodes);
+ }
+ }
+
+ private void addBytes(byte functionStateOrHasResult) {
+ if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+ getMessage().addBytesPart(new byte[] { functionStateOrHasResult });
+ } else {
+ byte[] bytes = new byte[5];
+ bytes[0] = functionStateOrHasResult;
+ Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+ getMessage().addBytesPart(bytes);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+ // Read the header which describes the type of message following
+ try {
+ executeFunctionResponseMsg.readHeader();
+ switch (executeFunctionResponseMsg.getMessageType()) {
+ case MessageType.EXECUTE_REGION_FUNCTION_RESULT:
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_RESULT. The number of parts are : {}", executeFunctionResponseMsg.getNumberOfParts());
+ }
+ // Read the chunk
+ do {
+ executeFunctionResponseMsg.receiveChunk();
+ Object resultResponse = executeFunctionResponseMsg.getPart(0)
+ .getObject();
+ Object result;
+ if (resultResponse instanceof ArrayList) {
+ result = ((ArrayList)resultResponse).get(0);
+ }
+ else {
+ result = resultResponse;
+ }
+ if (result instanceof FunctionException) {
+ FunctionException ex = ((FunctionException)result);
+ if (ex instanceof InternalFunctionException) {
+ Throwable cause = ex.getCause();
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ this.resultCollector
+ .addResult(memberID, cause);
+ FunctionStats.getFunctionStats(this.functionId,
+ this.executor.getRegion().getSystem())
+ .incResultsReceived();
+ continue;
+ }
+ else if (((FunctionException)result).getCause() instanceof InternalFunctionInvocationTargetException) {
+ InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+ .getCause();
+ this.failedNodes.addAll(ifite.getFailedNodeSet());
+ }
+ executeFunctionResponseMsg.clear();
+ throw ex;
+ }
+ else if (result instanceof Throwable) {
+ String s = "While performing a remote " + getOpName();
+ executeFunctionResponseMsg.clear();
+ throw new ServerOperationException(s, (Throwable)result);
+ }
+ else {
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ this.resultCollector.addResult(memberID, result);
+ FunctionStats.getFunctionStats(this.functionId,
+ this.executor.getRegion().getSystem()).incResultsReceived();
+ }
+ } while (!executeFunctionResponseMsg.isLastChunk());
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received all the results from server successfully.");
+ }
+ this.resultCollector.endResults();
+ return null;
+
+ case MessageType.EXCEPTION:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXCEPTION. The number of parts are : {}", executeFunctionResponseMsg.getNumberOfParts());
+ }
+
+ // Read the chunk
+ executeFunctionResponseMsg.receiveChunk();
+ Part part0 = executeFunctionResponseMsg.getPart(0);
+ Object obj = part0.getObject();
+ if (obj instanceof FunctionException) {
+ FunctionException ex = ((FunctionException)obj);
+ if (((FunctionException)obj).getCause() instanceof InternalFunctionInvocationTargetException) {
+ InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+ .getCause();
+ this.failedNodes.addAll(ifite.getFailedNodeSet());
+ }
+ executeFunctionResponseMsg.clear();
+ throw ex;
+ }
+ else if (obj instanceof Throwable) {
+ executeFunctionResponseMsg.clear();
+ String s = "While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable)obj);
+ }
+ break;
+ case MessageType.EXECUTE_REGION_FUNCTION_ERROR:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_ERROR");
+ }
+ // Read the chunk
+ executeFunctionResponseMsg.receiveChunk();
+ String errorMessage = executeFunctionResponseMsg.getPart(0)
+ .getString();
+ executeFunctionResponseMsg.clear();
+ throw new ServerOperationException(errorMessage);
+ default:
+ throw new InternalGemFireError("Unknown message type "
+ + executeFunctionResponseMsg.getMessageType());
+ }
+ }
+ finally {
+ executeFunctionResponseMsg.clear();
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startExecuteFunction();
+ }
+
+ protected String getOpName() {
+ return "executeRegionFunction";
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunctionSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(1, Version.CURRENT);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
new file mode 100644
index 0000000..5ea03f8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -0,0 +1,491 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * @author skumar
+ * @since 6.5
+ */
+public class ExecuteRegionFunctionSingleHopOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private ExecuteRegionFunctionSingleHopOp() {
+ }
+
+ public static void execute(ExecutablePool pool, Region region,
+ Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult,
+ Map<ServerLocation, ? extends HashSet> serverToFilterMap, int mRetryAttempts, boolean allBuckets) {
+
+ boolean reexecute = false;
+ Set<String> failedNodes = new HashSet<String>();
+ int maxRetryAttempts = 0;
+ if (function.isHA()) {
+ maxRetryAttempts = mRetryAttempts;
+ }
+ ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
+ .getClientMetadataService();
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
+ }
+ List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
+ serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, function,
+ hasResult, resultCollector, cms, allBuckets);
+
+ reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
+ (LocalRegion)region, resultCollector, failedNodes);
+
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is : {}", callableTasks.size());
+ }
+
+ if (reexecute ) {
+ resultCollector.clearResults();
+ if(function.isHA()) {
+ ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(), function,
+ serverRegionExecutor, resultCollector, hasResult, failedNodes,
+ maxRetryAttempts - 1);
+ }
+ else {
+ ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), function,
+ serverRegionExecutor, resultCollector, hasResult,
+ maxRetryAttempts - 1);
+ }
+ }
+
+ resultCollector.endResults();
+ }
+
+ public static void execute(ExecutablePool pool, Region region,
+ String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector resultCollector, byte hasResult,
+ Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+ int mRetryAttempts, boolean allBuckets, boolean isHA,
+ boolean optimizeForWrite) {
+
+ boolean reexecute = false;
+ Set<String> failedNodes = new HashSet<String>();
+ int maxRetryAttempts = 0;
+ if (isHA) {
+ maxRetryAttempts = mRetryAttempts;
+ }
+ ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
+ .getClientMetadataService();
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The serverToFilterMap is : {}", serverToFilterMap);
+ }
+ List<SingleHopOperationCallable> callableTasks = constructAndGetExecuteFunctionTasks(region.getFullPath(),
+ serverRegionExecutor, serverToFilterMap, (PoolImpl)pool, functionId,
+ hasResult, resultCollector, cms, allBuckets, isHA,optimizeForWrite);
+
+ reexecute = SingleHopClientExecutor.submitAllHA(callableTasks,
+ (LocalRegion)region, resultCollector, failedNodes);
+
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOp#execute : The size of callableTask is: {}, reexecute={}", callableTasks.size(), reexecute);
+ }
+
+ if (reexecute) {
+ resultCollector.clearResults();
+ if (isHA) {
+ ExecuteRegionFunctionOp.reexecute(pool, region.getFullPath(),
+ functionId, serverRegionExecutor, resultCollector, hasResult,
+ failedNodes, maxRetryAttempts - 1, isHA, optimizeForWrite);
+ }
+ else {
+ ExecuteRegionFunctionOp.execute(pool, region.getFullPath(), functionId,
+ serverRegionExecutor, resultCollector, hasResult,
+ maxRetryAttempts - 1, isHA, optimizeForWrite);
+ }
+ }
+
+ resultCollector.endResults();
+ }
+
+
+ static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
+ ServerRegionFunctionExecutor serverRegionExecutor,
+ final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+ final PoolImpl pool, final Function function, byte hasResult,
+ ResultCollector rc, ClientMetadataService cms, boolean allBucket) {
+ final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+ ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+ serverToFilterMap.keySet());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Constructing tasks for the servers {}", servers);
+ }
+ for (ServerLocation server : servers) {
+ ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
+ .withFilter(serverToFilterMap.get(server));
+
+ AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, function, executor, rc, hasResult, new HashSet<String>(),
+ allBucket);
+ SingleHopOperationCallable task = new SingleHopOperationCallable(
+ new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ static List<SingleHopOperationCallable> constructAndGetExecuteFunctionTasks(String region,
+ ServerRegionFunctionExecutor serverRegionExecutor,
+ final Map<ServerLocation, ? extends HashSet> serverToFilterMap,
+ final PoolImpl pool, final String functionId, byte hasResult,
+ ResultCollector rc, ClientMetadataService cms, boolean allBucket, boolean isHA, boolean optimizeForWrite) {
+ final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+ ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+ serverToFilterMap.keySet());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Constructing tasks for the servers {}", servers);
+ }
+ for (ServerLocation server : servers) {
+ ServerRegionFunctionExecutor executor = (ServerRegionFunctionExecutor)serverRegionExecutor
+ .withFilter(serverToFilterMap.get(server));
+
+ AbstractOp op = new ExecuteRegionFunctionSingleHopOpImpl(region, functionId, executor, rc, hasResult, new HashSet<String>(),
+ allBucket, isHA, optimizeForWrite);
+ SingleHopOperationCallable task = new SingleHopOperationCallable(
+ new ServerLocation(server.getHostName(), server.getPort()), pool, op, UserAttributes.userAttributes.get());
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ static class ExecuteRegionFunctionSingleHopOpImpl extends AbstractOp {
+
+ private final ResultCollector resultCollector;
+
+ private final String functionId;
+
+ private final String regionName;
+
+ private final ServerRegionFunctionExecutor executor;
+
+ private final byte hasResult;
+
+ private Set<String> failedNodes = new HashSet<String>();
+
+ private boolean isHA;
+
+ private boolean optimizeForWrite;
+
+ public ExecuteRegionFunctionSingleHopOpImpl(String region,
+ Function function, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector rc, byte hasResult,
+ Set<String> removedNodes, boolean allBuckets) {
+ // What is this 8 that is getting added to filter and removednode sizes?
+ // It should have been used as a constant and documented
+ super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+ + serverRegionExecutor.getFilter().size() + removedNodes.size());
+ this.isHA = function.isHA();
+ this.optimizeForWrite = function.optimizeForWrite();
+ byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+ function.hasResult() , function.optimizeForWrite());
+ Set routingObjects = serverRegionExecutor.getFilter();
+ Object args = serverRegionExecutor.getArguments();
+ MemberMappedArgument memberMappedArg = serverRegionExecutor
+ .getMemberMappedArgument();
+ addBytes(functionState);
+ getMessage().addStringPart(region);
+ if (serverRegionExecutor.isFnSerializationReqd()) {
+ getMessage().addStringOrObjPart(function);
+ }
+ else {
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ if(allBuckets){
+ getMessage().addIntPart((Integer)key);
+ }
+ else {
+ getMessage().addStringOrObjPart(key);
+ }
+ }
+ getMessage().addIntPart(removedNodes.size());
+ for (Object nodes : removedNodes) {
+ getMessage().addStringOrObjPart(nodes);
+ }
+
+ this.resultCollector = rc;
+ this.regionName = region;
+ this.functionId = function.getId();
+ this.executor = serverRegionExecutor;
+ this.hasResult = functionState;
+ this.failedNodes = removedNodes;
+ }
+
+ public ExecuteRegionFunctionSingleHopOpImpl(String region,
+ String functionId, ServerRegionFunctionExecutor serverRegionExecutor,
+ ResultCollector rc, byte hasResult,
+ Set<String> removedNodes, boolean allBuckets, boolean isHA, boolean optimizeForWrite) {
+ // What is this 8 that is getting added to filter and removednode sizes?
+ // It should have been used as a constant and documented
+ super(MessageType.EXECUTE_REGION_FUNCTION_SINGLE_HOP, 8
+ + serverRegionExecutor.getFilter().size() + removedNodes.size());
+ this.isHA = isHA;
+ this.optimizeForWrite = optimizeForWrite;
+ Set routingObjects = serverRegionExecutor.getFilter();
+ Object args = serverRegionExecutor.getArguments();
+ byte functionState = AbstractExecution.getFunctionState(isHA,
+ hasResult == (byte)1 ? true : false, optimizeForWrite);
+ MemberMappedArgument memberMappedArg = serverRegionExecutor
+ .getMemberMappedArgument();
+ addBytes(functionState);
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(functionId);
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addBytesPart(new byte[] { allBuckets ? (byte)1 : (byte)0 });
+ getMessage().addIntPart(routingObjects.size());
+ for (Object key : routingObjects) {
+ if(allBuckets){
+ getMessage().addIntPart((Integer)key);
+ }
+ else {
+ getMessage().addStringOrObjPart(key);
+ }
+ }
+ getMessage().addIntPart(removedNodes.size());
+ for (Object nodes : removedNodes) {
+ getMessage().addStringOrObjPart(nodes);
+ }
+
+ this.resultCollector = rc;
+ this.regionName = region;
+ this.functionId = functionId;
+ this.executor = serverRegionExecutor;
+ this.hasResult = functionState;
+ this.failedNodes = removedNodes;
+ }
+
+ private void addBytes(byte functionState) {
+ if (GemFireCacheImpl.getClientFunctionTimeout() == GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT) {
+ getMessage().addBytesPart(new byte[] { functionState });
+ } else {
+ byte[] bytes = new byte[5];
+ bytes[0] = functionState;
+ Part.encodeInt(GemFireCacheImpl.getClientFunctionTimeout(), bytes, 1);
+ getMessage().addBytesPart(bytes);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ ChunkedMessage executeFunctionResponseMsg = (ChunkedMessage)msg;
+ try {
+ executeFunctionResponseMsg.readHeader();
+ switch (executeFunctionResponseMsg.getMessageType()) {
+ case MessageType.EXECUTE_REGION_FUNCTION_RESULT:
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_RESULT.");
+ }
+ do {
+ executeFunctionResponseMsg.receiveChunk();
+ Object resultResponse = executeFunctionResponseMsg.getPart(0)
+ .getObject();
+ Object result;
+ if (resultResponse instanceof ArrayList) {
+ result = ((ArrayList)resultResponse).get(0);
+ }
+ else {
+ result = resultResponse;
+ }
+
+ if (result instanceof FunctionException) {
+ FunctionException ex = ((FunctionException)result);
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received Exception. {}", ex.getCause());
+ }
+ if (ex instanceof InternalFunctionException) {
+ Throwable cause = ex.getCause();
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ this.resultCollector
+ .addResult(memberID, cause);
+ FunctionStats.getFunctionStats(this.functionId,
+ this.executor.getRegion().getSystem())
+ .incResultsReceived();
+ continue;
+ }
+ else if (((FunctionException)result).getCause() instanceof InternalFunctionInvocationTargetException) {
+ InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+ .getCause();
+ this.failedNodes.addAll(ifite.getFailedNodeSet());
+ }
+ if (!ex.getMessage().equals("Buckets are null"))
+ throw ex;
+
+ return null;
+ }
+ else if (result instanceof Throwable) {
+ String s = "While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable)result);
+ }
+ else {
+ DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse)
+ .get(1);
+ synchronized (this.resultCollector) {
+ this.resultCollector
+ .addResult(memberID, result);
+ }
+ FunctionStats.getFunctionStats(this.functionId,
+ this.executor.getRegion().getSystem()).incResultsReceived();
+ }
+ } while (!executeFunctionResponseMsg.isLastChunk());
+ if (isDebugEnabled) {
+ logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received all the results from server successfully.");
+ }
+ return null;
+
+ case MessageType.EXCEPTION:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXCEPTION");
+ }
+ executeFunctionResponseMsg.receiveChunk();
+ Part part0 = executeFunctionResponseMsg.getPart(0);
+ Object obj = part0.getObject();
+
+ if (obj instanceof FunctionException) {
+ FunctionException ex = ((FunctionException)obj);
+ if (((FunctionException)obj).getCause() instanceof InternalFunctionInvocationTargetException) {
+ InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex
+ .getCause();
+ this.failedNodes.addAll(ifite.getFailedNodeSet());
+ }
+ if (!ex.getMessage().equals("Buckets are null")) {
+ throw ex;
+ }
+ return null;
+ }
+ else if (obj instanceof Throwable) {
+ String s = "While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable)obj);
+ }
+ break;
+ case MessageType.EXECUTE_REGION_FUNCTION_ERROR:
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received message of type EXECUTE_REGION_FUNCTION_ERROR");
+ }
+ executeFunctionResponseMsg.receiveChunk();
+ String errorMessage = executeFunctionResponseMsg.getPart(0)
+ .getString();
+ throw new ServerOperationException(errorMessage);
+
+ default:
+ throw new InternalGemFireError("Unknown message type "
+ + executeFunctionResponseMsg.getMessageType());
+ }
+ }
+ finally {
+ executeFunctionResponseMsg.clear();
+ }
+ return null;
+ }
+
+ ResultCollector getResultCollector() {
+ return this.resultCollector;
+ }
+
+ String getFunctionId() {
+ return this.functionId;
+ }
+
+ String getRegionName() {
+ return this.regionName;
+ }
+
+ ServerRegionFunctionExecutor getExecutor() {
+ return this.executor;
+ }
+
+ byte getHasResult() {
+ return this.hasResult;
+ }
+
+ boolean isHA() {
+ return this.isHA;
+ }
+
+ boolean optimizeForWrite() {
+ return this.optimizeForWrite;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startExecuteFunction();
+ }
+
+ protected String getOpName() {
+ return "executeRegionFunctionSingleHop";
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunctionSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(1, Version.CURRENT);
+ }
+
+ }
+}