You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/07/08 15:51:25 UTC
[05/50] [abbrv] incubator-geode git commit: GEODE-1751: putting
security checks in all applicable client-server commands.
GEODE-1751: putting security checks in all applicable client-server commands.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/536c13bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/536c13bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/536c13bd
Branch: refs/heads/develop
Commit: 536c13bdef16db663194b33bb7ebd64dd78216b5
Parents: e504d97
Author: Jinmei Liao <ji...@pivotal.io>
Authored: Fri Jun 24 22:59:32 2016 -0700
Committer: Jinmei Liao <ji...@pivotal.io>
Committed: Fri Jun 24 22:59:32 2016 -0700
----------------------------------------------------------------------
.../cache/tier/sockets/command/Destroy65.java | 247 ++++++------
.../tier/sockets/command/ExecuteFunction.java | 7 +-
.../tier/sockets/command/ExecuteFunction65.java | 252 ++++++-------
.../tier/sockets/command/ExecuteFunction66.java | 345 ++++++++---------
.../sockets/command/ExecuteRegionFunction.java | 235 ++++++------
.../command/ExecuteRegionFunction65.java | 357 ++++++++----------
.../command/ExecuteRegionFunction66.java | 374 ++++++++-----------
.../command/ExecuteRegionFunctionSingleHop.java | 356 ++++++++----------
.../cache/tier/sockets/command/GetAll.java | 145 +++----
.../cache/tier/sockets/command/GetAll651.java | 157 ++++----
.../cache/tier/sockets/command/GetAll70.java | 292 ++++++++-------
.../sockets/command/GetAllWithCallback.java | 68 ++--
.../sockets/command/GetFunctionAttribute.java | 37 +-
.../cache/tier/sockets/command/Invalidate.java | 188 +++++-----
.../cache/tier/sockets/command/KeySet.java | 149 ++++----
.../cache/tier/sockets/command/Put61.java | 306 +++++++--------
.../cache/tier/sockets/command/Put65.java | 3 +-
.../cache/tier/sockets/command/PutAll.java | 49 +--
.../cache/tier/sockets/command/PutAll70.java | 5 +-
.../cache/tier/sockets/command/PutAll80.java | 1 +
.../tier/sockets/command/RegisterInterest.java | 42 +--
.../sockets/command/RegisterInterest61.java | 38 +-
.../cache/tier/sockets/command/RemoveAll.java | 4 +-
.../sockets/command/UnregisterInterest.java | 113 +++---
.../sockets/command/UnregisterInterestList.java | 119 +++---
.../internal/security/GeodeSecurityUtil.java | 7 +
.../gemfire/security/GeodePermission.java | 6 +-
...tegratedClientGetPutAuthDistributedTest.java | 18 +-
28 files changed, 1876 insertions(+), 2044 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/536c13bd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
index c88ea24..41cc0be 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
@@ -188,145 +188,146 @@ public class Destroy65 extends BaseCommand {
}
writeErrorResponse(msg, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), servConn);
servConn.setAsTrue(RESPONDED);
- } else {
- LocalRegion region = (LocalRegion) crHelper.getRegion(regionName);
- if (region == null) {
- String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST.toLocalizedString(regionName);
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
- } else {
- // Destroy the entry
- ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
- long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
- EventIDHolder clientEvent = new EventIDHolder(eventId);
-
- Breadcrumbs.setEventId(eventId);
-
- // msg.isRetry might be set by v7.0 and later clients
- if (msg.isRetry()) {
- // if (logger.isDebugEnabled()) {
- // logger.debug("DEBUG: encountered isRetry in Destroy65");
- // }
- clientEvent.setPossibleDuplicate(true);
- if (region.getAttributes().getConcurrencyChecksEnabled()) {
- // recover the version tag from other servers
- clientEvent.setRegion(region);
- if (!recoverVersionTagForRetriedOperation(clientEvent)) {
- clientEvent.setPossibleDuplicate(false); // no-one has seen this event
- }
- }
- }
+ return;
+ }
+
+ LocalRegion region = (LocalRegion) crHelper.getRegion(regionName);
+ if (region == null) {
+ String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST.toLocalizedString(regionName);
+ writeRegionDestroyedEx(msg, regionName, reason, servConn);
+ servConn.setAsTrue(RESPONDED);
+ return;
+ }
+
+ // for integrated security
+ GeodeSecurityUtil.authorizeRegionWrite(regionName, key.toString());
+
+ // Destroy the entry
+ ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
+ long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+ long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+ EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventIDHolder clientEvent = new EventIDHolder(eventId);
+
+ Breadcrumbs.setEventId(eventId);
- // for integrated security
- GeodeSecurityUtil.authorizeRegionWrite(regionName, key.toString());
+ // msg.isRetry might be set by v7.0 and later clients
+ if (msg.isRetry()) {
+ // if (logger.isDebugEnabled()) {
+ // logger.debug("DEBUG: encountered isRetry in Destroy65");
+ // }
+ clientEvent.setPossibleDuplicate(true);
+ if (region.getAttributes().getConcurrencyChecksEnabled()) {
+ // recover the version tag from other servers
+ clientEvent.setRegion(region);
+ if (!recoverVersionTagForRetriedOperation(clientEvent)) {
+ clientEvent.setPossibleDuplicate(false); // no-one has seen this event
+ }
+ }
+ }
+ try {
+ AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ if (authzRequest != null) {
+ // TODO SW: This is to handle DynamicRegionFactory destroy
+ // calls. Rework this when the semantics of DynamicRegionFactory are
+ // cleaned up.
+ if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+ RegionDestroyOperationContext destroyContext = authzRequest.destroyRegionAuthorize((String) key, callbackArg);
+ callbackArg = destroyContext.getCallbackArg();
+ } else {
+ DestroyOperationContext destroyContext = authzRequest.destroyAuthorize(regionName, key, callbackArg);
+ callbackArg = destroyContext.getCallbackArg();
+ }
+ }
+ if (operation == null || operation == Operation.DESTROY) {
+ region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true, clientEvent);
+ } else {
+ // this throws exceptions if expectedOldValue checks fail
try {
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
- if (authzRequest != null) {
- // TODO SW: This is to handle DynamicRegionFactory destroy
- // calls. Rework this when the semantics of DynamicRegionFactory are
- // cleaned up.
- if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
- RegionDestroyOperationContext destroyContext = authzRequest.destroyRegionAuthorize((String) key, callbackArg);
- callbackArg = destroyContext.getCallbackArg();
- } else {
- DestroyOperationContext destroyContext = authzRequest.destroyAuthorize(regionName, key, callbackArg);
- callbackArg = destroyContext.getCallbackArg();
- }
+ if (expectedOldValue == null) {
+ expectedOldValue = Token.INVALID;
}
- if (operation == null || operation == Operation.DESTROY) {
- region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true, clientEvent);
- } else {
- // this throws exceptions if expectedOldValue checks fail
+ if (operation == Operation.REMOVE && msg.isRetry() && clientEvent.getVersionTag() != null) {
+ // the operation was successful last time it was tried, so there's
+ // no need to perform it again. Just return the version tag and
+ // success status
+ if (logger.isDebugEnabled()) {
+ logger.debug("remove(k,v) operation was successful last time with version {}", clientEvent.getVersionTag());
+ }
+ // try the operation anyway to ensure that it's been distributed to all servers
try {
- if (expectedOldValue == null) {
- expectedOldValue = Token.INVALID;
- }
- if (operation == Operation.REMOVE && msg.isRetry() && clientEvent.getVersionTag() != null) {
- // the operation was successful last time it was tried, so there's
- // no need to perform it again. Just return the version tag and
- // success status
- if (logger.isDebugEnabled()) {
- logger.debug("remove(k,v) operation was successful last time with version {}", clientEvent.getVersionTag());
- }
- // try the operation anyway to ensure that it's been distributed to all servers
- try {
- region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), true, clientEvent);
- } catch (EntryNotFoundException e) {
- // ignore, and don't set entryNotFoundForRemove because this was a successful
- // operation - bug #51664
- }
- } else {
- region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), true, clientEvent);
- if (logger.isDebugEnabled()) {
- logger.debug("region.remove succeeded");
- }
- }
+ region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), true, clientEvent);
} catch (EntryNotFoundException e) {
- servConn.setModificationInfo(true, regionName, key);
- if (logger.isDebugEnabled()) {
- logger.debug("writing entryNotFound response");
- }
- entryNotFoundForRemove = true;
+ // ignore, and don't set entryNotFoundForRemove because this was a successful
+ // operation - bug #51664
}
- }
- servConn.setModificationInfo(true, regionName, key);
- } catch (EntryNotFoundException e) {
- // Don't send an exception back to the client if this
- // exception happens. Just log it and continue.
- logger.info(LocalizedMessage.create(LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, new Object[] {
- servConn.getName(),
- key
- }));
- entryNotFoundForRemove = true;
- } catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
- return;
- } catch (Exception e) {
- // If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
-
- // If an exception occurs during the destroy, preserve the connection
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
- if (e instanceof GemFireSecurityException) {
- // Fine logging for security exceptions since these are already
- // logged by the security logger
+ } else {
+ region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), true, clientEvent);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
+ logger.debug("region.remove succeeded");
}
- } else {
- logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION, servConn.getName()), e);
}
- return;
- }
-
- // Update the statistics and write the reply
- now = DistributionStats.getStatTime();
- stats.incProcessDestroyTime(now - start);
-
- if (region instanceof PartitionedRegion) {
- PartitionedRegion pr = (PartitionedRegion) region;
- if (pr.isNetworkHop() != (byte) 0) {
- writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.isNetworkHop(), clientEvent.getVersionTag());
- pr.setIsNetworkHop((byte) 0);
- pr.setMetadataVersion((byte) 0);
- } else {
- writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag());
+ } catch (EntryNotFoundException e) {
+ servConn.setModificationInfo(true, regionName, key);
+ if (logger.isDebugEnabled()) {
+ logger.debug("writing entryNotFound response");
}
- } else {
- writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag());
+ entryNotFoundForRemove = true;
}
- servConn.setAsTrue(RESPONDED);
+ }
+ servConn.setModificationInfo(true, regionName, key);
+ } catch (EntryNotFoundException e) {
+ // Don't send an exception back to the client if this
+ // exception happens. Just log it and continue.
+ logger.info(LocalizedMessage.create(LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, new Object[] {
+ servConn.getName(), key
+ }));
+ entryNotFoundForRemove = true;
+ } catch (RegionDestroyedException rde) {
+ writeException(msg, rde, false, servConn);
+ servConn.setAsTrue(RESPONDED);
+ return;
+ } catch (Exception e) {
+ // If an interrupted exception is thrown , rethrow it
+ checkForInterrupt(servConn, e);
+
+ // If an exception occurs during the destroy, preserve the connection
+ writeException(msg, e, false, servConn);
+ servConn.setAsTrue(RESPONDED);
+ if (e instanceof GemFireSecurityException) {
+ // Fine logging for security exceptions since these are already
+ // logged by the security logger
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName, key);
+ logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
}
- stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start);
+ } else {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION, servConn.getName()), e);
+ }
+ return;
+ }
+
+ // Update the statistics and write the reply
+ now = DistributionStats.getStatTime();
+ stats.incProcessDestroyTime(now - start);
+
+ if (region instanceof PartitionedRegion) {
+ PartitionedRegion pr = (PartitionedRegion) region;
+ if (pr.isNetworkHop() != (byte) 0) {
+ writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.isNetworkHop(), clientEvent.getVersionTag());
+ pr.setIsNetworkHop((byte) 0);
+ pr.setMetadataVersion((byte) 0);
+ } else {
+ writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag());
}
+ } else {
+ writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag());
+ }
+ servConn.setAsTrue(RESPONDED);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName, key);
}
+ stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/536c13bd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction.java
index 8ceb001..0f3bdec 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction.java
@@ -53,6 +53,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
/**
* This is the base command which read the parts for the
@@ -110,7 +111,7 @@ public class ExecuteFunction extends BaseCommand {
sendError(hasResult, msg, message, servConn);
return;
}
- else {
+
// Execute function on the cache
try {
Function functionObject = null;
@@ -128,6 +129,9 @@ public class ExecuteFunction extends BaseCommand {
else {
functionObject = (Function)function;
}
+
+ GeodeSecurityUtil.authorizeFunctionExec(functionObject.getId());
+
FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId(), null);
// check if the caller is authorized to do this operation on server
@@ -217,7 +221,6 @@ public class ExecuteFunction extends BaseCommand {
final String message = e.getMessage();
sendException(hasResult, msg, message, servConn,e);
}
- }
}
private void sendException(byte hasResult, Message msg, String message,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/536c13bd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction65.java
index d2869d1..ff6cdd6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction65.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
/**
* @since GemFire 6.5
@@ -68,8 +69,7 @@ public class ExecuteFunction65 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
- throws IOException {
+ public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
Object function = null;
Object args = null;
MemberMappedArgument memberMappedArg = null;
@@ -86,11 +86,10 @@ public class ExecuteFunction65 extends BaseCommand {
functionState = AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE;
isReexecute = true;
}
-
- if(functionState != 1) {
+
+ if (functionState != 1) {
hasResult = (byte) ((functionState & 2) - 1);
- }
- else {
+ } else {
hasResult = functionState;
}
if (hasResult == 1) {
@@ -99,13 +98,12 @@ public class ExecuteFunction65 extends BaseCommand {
}
function = msg.getPart(1).getStringOrObject();
args = msg.getPart(2).getObject();
-
+
Part part = msg.getPart(3);
if (part != null) {
- memberMappedArg = (MemberMappedArgument)part.getObject();
+ memberMappedArg = (MemberMappedArgument) part.getObject();
}
- }
- catch (ClassNotFoundException exception) {
+ } catch (ClassNotFoundException exception) {
logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception);
if (hasResult == 1) {
writeChunkedException(msg, exception, false, servConn);
@@ -114,157 +112,137 @@ public class ExecuteFunction65 extends BaseCommand {
}
}
if (function == null) {
- final String message =
- LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
- .toLocalizedString();
+ final String message = LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
+ .toLocalizedString();
logger.warn("{}: {}", servConn.getName(), message);
sendError(hasResult, msg, message, servConn);
return;
}
- else {
- // Execute function on the cache
- try {
- Function functionObject = null;
- if (function instanceof String) {
- functionObject = FunctionService.getFunction((String)function);
- if (functionObject == null) {
- final String message =
- LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
- .toLocalizedString(function);
- logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
- return;
- }
- else {
- byte functionStateOnServerSide = AbstractExecution.getFunctionState(
- functionObject.isHA(), functionObject.hasResult(),
- functionObject.optimizeForWrite());
- if (logger.isDebugEnabled()) {
- logger.debug("Function State on server side: {} on client: {}", functionStateOnServerSide, functionState);
- }
- if (functionStateOnServerSide != functionState) {
- String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
- .toLocalizedString(function);
- logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
- return;
- }
- }
- }
- else {
- functionObject = (Function)function;
- }
- FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId(), null);
- // check if the caller is authorized to do this operation on server
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
- ExecuteFunctionOperationContext executeContext = null;
- if (authzRequest != null) {
- executeContext = authzRequest.executeFunctionAuthorize(functionObject
- .getId(), null, null, args, functionObject.optimizeForWrite());
- }
- ChunkedMessage m = servConn.getFunctionResponseMessage();
- m.setTransactionId(msg.getTransactionId());
- ResultSender resultSender = new ServerToClientFunctionResultSender65(m,
- MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject, executeContext);
-
- InternalDistributedMember localVM = InternalDistributedSystem
- .getAnyInstance().getDistributedMember();
- FunctionContext context = null;
- if (memberMappedArg != null) {
- context = new FunctionContextImpl(functionObject.getId(),
- memberMappedArg.getArgumentsForMember(localVM.getId()),
- resultSender, isReexecute);
- }
- else {
- context = new FunctionContextImpl(functionObject.getId(), args,
- resultSender, isReexecute);
- }
- HandShake handShake = (HandShake)servConn.getHandshake();
- int earlierClientReadTimeout = handShake.getClientReadTimeout();
- handShake.setClientReadTimeout(0);
- try {
- long startExecution = stats.startTime();
- stats.startFunctionExecution(functionObject.hasResult());
+ // Execute function on the cache
+ try {
+ Function functionObject = null;
+ if (function instanceof String) {
+ functionObject = FunctionService.getFunction((String) function);
+ if (functionObject == null) {
+ final String message = LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED.toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ } else {
+ byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), functionObject.hasResult(), functionObject
+ .optimizeForWrite());
if (logger.isDebugEnabled()) {
- logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
+ logger.debug("Function State on server side: {} on client: {}", functionStateOnServerSide, functionState);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (functionObject.optimizeForWrite() && cache != null &&
- hmm.getState().isCritical() &&
- !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections.singleton((DistributedMember)cache.getMyId());
- Exception e = new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {functionObject.getId(), sm}), sm);
-
- sendException(hasResult, msg, e.getMessage(), servConn, e);
+ if (functionStateOnServerSide != functionState) {
+ String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER.toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
return;
}
- functionObject.execute(context);
- if (!((ServerToClientFunctionResultSender65)resultSender)
- .isLastResultReceived() && functionObject.hasResult()) {
- throw new FunctionException(
- LocalizedStrings.ExecuteFunction_THE_FUNCTION_0_DID_NOT_SENT_LAST_RESULT
- .toString(functionObject.getId()));
- }
- stats.endFunctionExecution(startExecution,
- functionObject.hasResult());
- }
- catch (FunctionException functionException) {
- stats.endFunctionExecutionWithException(functionObject.hasResult());
- throw functionException;
- }
- catch (Exception exception) {
- stats.endFunctionExecutionWithException(functionObject.hasResult());
- throw new FunctionException(exception);
- }
- finally{
- handShake.setClientReadTimeout(earlierClientReadTimeout);
}
+ } else {
+ functionObject = (Function) function;
}
- catch (IOException ioException) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioException);
- String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString();
- sendException(hasResult, msg, message, servConn, ioException);
+
+ GeodeSecurityUtil.authorizeFunctionExec(functionObject.getId());
+
+ FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId(), null);
+ // check if the caller is authorized to do this operation on server
+ AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ ExecuteFunctionOperationContext executeContext = null;
+ if (authzRequest != null) {
+ executeContext = authzRequest.executeFunctionAuthorize(functionObject.getId(), null, null, args, functionObject.optimizeForWrite());
}
- catch (InternalFunctionInvocationTargetException internalfunctionException) {
- // Fix for #44709: User should not be aware of
- // InternalFunctionInvocationTargetException. No instance of
- // InternalFunctionInvocationTargetException is giving useful
- // information to user to take any corrective action hence logging
- // this at fine level logging
- // 1> In case of HA FucntionInvocationTargetException thrown. Since
- // it is HA, function will be reexecuted on right node
- // 2> in case of HA member departed
+ ChunkedMessage m = servConn.getFunctionResponseMessage();
+ m.setTransactionId(msg.getTransactionId());
+ ResultSender resultSender = new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject, executeContext);
+
+ InternalDistributedMember localVM = InternalDistributedSystem.getAnyInstance().getDistributedMember();
+ FunctionContext context = null;
+
+ if (memberMappedArg != null) {
+ context = new FunctionContextImpl(functionObject.getId(), memberMappedArg.getArgumentsForMember(localVM.getId()), resultSender, isReexecute);
+ } else {
+ context = new FunctionContextImpl(functionObject.getId(), args, resultSender, isReexecute);
+ }
+ HandShake handShake = (HandShake) servConn.getHandshake();
+ int earlierClientReadTimeout = handShake.getClientReadTimeout();
+ handShake.setClientReadTimeout(0);
+ try {
+ long startExecution = stats.startTime();
+ stats.startFunctionExecution(functionObject.hasResult());
if (logger.isDebugEnabled()) {
- logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] { function }), internalfunctionException);
+ logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
}
- final String message = internalfunctionException.getMessage();
- sendException(hasResult, msg, message, servConn, internalfunctionException);
- }
- catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
- final String message = e.getMessage();
- sendException(hasResult, msg, message, servConn,e);
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
+ if (functionObject.optimizeForWrite() && cache != null &&
+ hmm.getState().isCritical() &&
+ !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+ Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
+ Exception e = new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(new Object[] {
+ functionObject.getId(),
+ sm
+ }), sm);
+
+ sendException(hasResult, msg, e.getMessage(), servConn, e);
+ return;
+ }
+ functionObject.execute(context);
+ if (!((ServerToClientFunctionResultSender65) resultSender).isLastResultReceived() && functionObject.hasResult()) {
+ throw new FunctionException(LocalizedStrings.ExecuteFunction_THE_FUNCTION_0_DID_NOT_SENT_LAST_RESULT.toString(functionObject
+ .getId()));
+ }
+ stats.endFunctionExecution(startExecution, functionObject.hasResult());
+ } catch (FunctionException functionException) {
+ stats.endFunctionExecutionWithException(functionObject.hasResult());
+ throw functionException;
+ } catch (Exception exception) {
+ stats.endFunctionExecutionWithException(functionObject.hasResult());
+ throw new FunctionException(exception);
+ } finally {
+ handShake.setClientReadTimeout(earlierClientReadTimeout);
+ }
+ } catch (IOException ioException) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioException);
+ String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString();
+ sendException(hasResult, msg, message, servConn, ioException);
+ } catch (InternalFunctionInvocationTargetException internalfunctionException) {
+ // Fix for #44709: User should not be aware of
+ // InternalFunctionInvocationTargetException. No instance of
+ // InternalFunctionInvocationTargetException is giving useful
+ // information to user to take any corrective action hence logging
+ // this at fine level logging
+ // 1> In case of HA FucntionInvocationTargetException thrown. Since
+ // it is HA, function will be reexecuted on right node
+ // 2> in case of HA member departed
+ if (logger.isDebugEnabled()) {
+ logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] {
+ function
+ }), internalfunctionException);
}
+ final String message = internalfunctionException.getMessage();
+ sendException(hasResult, msg, message, servConn, internalfunctionException);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
+ final String message = e.getMessage();
+ sendException(hasResult, msg, message, servConn, e);
}
}
- private void sendException(byte hasResult, Message msg, String message,
- ServerConnection servConn, Throwable e) throws IOException {
+ private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, Throwable e)
+ throws IOException {
if (hasResult == 1) {
- writeFunctionResponseException(msg, MessageType.EXCEPTION, message,
- servConn, e);
+ writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e);
servConn.setAsTrue(RESPONDED);
}
}
- private void sendError(byte hasResult, Message msg, String message,
- ServerConnection servConn) throws IOException {
+ private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) throws IOException {
if (hasResult == 1) {
- writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR,
- message, servConn);
+ writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn);
servConn.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/536c13bd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction66.java
index 3a20bc0..d5f3660 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction66.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction66.java
@@ -62,6 +62,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
/**
* @since GemFire 6.6
@@ -72,18 +73,16 @@ public class ExecuteFunction66 extends BaseCommand {
protected static volatile boolean ASYNC_TX_WARNING_ISSUED = false;
- static final ExecutorService execService = Executors
- .newCachedThreadPool(new ThreadFactory() {
- AtomicInteger threadNum = new AtomicInteger();
+ static final ExecutorService execService = Executors.newCachedThreadPool(new ThreadFactory() {
+ AtomicInteger threadNum = new AtomicInteger();
+
+ public Thread newThread(final Runnable r) {
+ Thread result = new Thread(r, "Function Execution Thread-" + threadNum.incrementAndGet());
+ result.setDaemon(true);
+ return result;
+ }
+ });
- public Thread newThread(final Runnable r) {
- Thread result = new Thread(r, "Function Execution Thread-"
- + threadNum.incrementAndGet());
- result.setDaemon(true);
- return result;
- }
- });
-
public static Command getCommand() {
return singleton;
}
@@ -92,8 +91,7 @@ public class ExecuteFunction66 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
- throws IOException {
+ public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
Object function = null;
Object args = null;
MemberMappedArgument memberMappedArg = null;
@@ -114,16 +112,14 @@ public class ExecuteFunction66 extends BaseCommand {
if (functionState == AbstractExecution.HA_HASRESULT_NO_OPTIMIZEFORWRITE_REEXECUTE) {
functionState = AbstractExecution.HA_HASRESULT_NO_OPTIMIZEFORWRITE;
isReexecute = true;
- }
- else if (functionState == AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE_REEXECUTE) {
+ } else if (functionState == AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE_REEXECUTE) {
functionState = AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE;
isReexecute = true;
}
if (functionState != 1) {
- hasResult = (byte)((functionState & 2) - 1);
- }
- else {
+ hasResult = (byte) ((functionState & 2) - 1);
+ } else {
hasResult = functionState;
}
if (hasResult == 1) {
@@ -135,173 +131,153 @@ public class ExecuteFunction66 extends BaseCommand {
Part part = msg.getPart(3);
if (part != null) {
- memberMappedArg = (MemberMappedArgument)part.getObject();
+ memberMappedArg = (MemberMappedArgument) part.getObject();
}
groups = getGroups(msg);
allMembers = getAllMembers(msg);
ignoreFailedMembers = getIgnoreFailedMembers(msg);
- }
- catch (ClassNotFoundException exception) {
+ } catch (ClassNotFoundException exception) {
logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception);
if (hasResult == 1) {
writeChunkedException(msg, exception, false, servConn);
- }
- else {
+ } else {
writeException(msg, exception, false, servConn);
}
servConn.setAsTrue(RESPONDED);
return;
}
+
if (function == null) {
final String message = LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
- .toLocalizedString();
- logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, new Object[] { servConn.getName(), message }));
+ .toLocalizedString();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, new Object[] {
+ servConn.getName(),
+ message
+ }));
sendError(hasResult, msg, message, servConn);
return;
}
- else {
- // Execute function on the cache
- try {
- Function functionObject = null;
- if (function instanceof String) {
- functionObject = FunctionService.getFunction((String)function);
- if (functionObject == null) {
- final String message = LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
- .toLocalizedString(function);
+
+ // Execute function on the cache
+ try {
+ Function functionObject = null;
+ if (function instanceof String) {
+ functionObject = FunctionService.getFunction((String) function);
+ if (functionObject == null) {
+ final String message = LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED.toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ } else {
+ byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), functionObject.hasResult(), functionObject
+ .optimizeForWrite());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Function State on server side: {} on client: {}", functionStateOnServerSide, functionState);
+ }
+ if (functionStateOnServerSide != functionState) {
+ String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER.toLocalizedString(function);
logger.warn("{}: {}", servConn.getName(), message);
sendError(hasResult, msg, message, servConn);
return;
}
- else {
- byte functionStateOnServerSide = AbstractExecution
- .getFunctionState(functionObject.isHA(), functionObject
- .hasResult(), functionObject.optimizeForWrite());
- if (logger.isDebugEnabled()) {
- logger.debug("Function State on server side: {} on client: {}", functionStateOnServerSide, functionState);
- }
- if (functionStateOnServerSide != functionState) {
- String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
- .toLocalizedString(function);
- logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
- return;
- }
- }
}
- else {
- functionObject = (Function)function;
- }
- FunctionStats stats = FunctionStats.getFunctionStats(functionObject
- .getId(), null);
- // check if the caller is authorized to do this operation on server
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
- ExecuteFunctionOperationContext executeContext = null;
- if (authzRequest != null) {
- executeContext = authzRequest.executeFunctionAuthorize(functionObject
- .getId(), null, null, args, functionObject.optimizeForWrite());
- }
- ChunkedMessage m = servConn.getFunctionResponseMessage();
- m.setTransactionId(msg.getTransactionId());
- ServerToClientFunctionResultSender resultSender = new ServerToClientFunctionResultSender65(m,
- MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject,
- executeContext);
-
- InternalDistributedMember localVM = InternalDistributedSystem
- .getAnyInstance().getDistributedMember();
- FunctionContext context = null;
-
- if (memberMappedArg != null) {
- context = new FunctionContextImpl(functionObject.getId(),
- memberMappedArg.getArgumentsForMember(localVM.getId()),
- resultSender, isReexecute);
- }
- else {
- context = new FunctionContextImpl(functionObject.getId(), args,
- resultSender, isReexecute);
+ } else {
+ functionObject = (Function) function;
+ }
+
+ GeodeSecurityUtil.authorizeFunctionExec(functionObject.getId());
+
+ FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId(), null);
+ // check if the caller is authorized to do this operation on server
+ AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ ExecuteFunctionOperationContext executeContext = null;
+ if (authzRequest != null) {
+ executeContext = authzRequest.executeFunctionAuthorize(functionObject.getId(), null, null, args, functionObject.optimizeForWrite());
+ }
+ ChunkedMessage m = servConn.getFunctionResponseMessage();
+ m.setTransactionId(msg.getTransactionId());
+ ServerToClientFunctionResultSender resultSender = new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject, executeContext);
+
+ InternalDistributedMember localVM = InternalDistributedSystem.getAnyInstance().getDistributedMember();
+ FunctionContext context = null;
+
+ if (memberMappedArg != null) {
+ context = new FunctionContextImpl(functionObject.getId(), memberMappedArg.getArgumentsForMember(localVM.getId()), resultSender, isReexecute);
+ } else {
+ context = new FunctionContextImpl(functionObject.getId(), args, resultSender, isReexecute);
+ }
+ HandShake handShake = (HandShake) servConn.getHandshake();
+ int earlierClientReadTimeout = handShake.getClientReadTimeout();
+ handShake.setClientReadTimeout(functionTimeout);
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
}
- HandShake handShake = (HandShake)servConn.getHandshake();
- int earlierClientReadTimeout = handShake.getClientReadTimeout();
- handShake.setClientReadTimeout(functionTimeout);
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
- }
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
- if (functionObject.optimizeForWrite() && cache != null
- && hmm.getState().isCritical()
- && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
- Set<DistributedMember> sm = Collections
- .singleton((DistributedMember)cache.getMyId());
- Exception e = new LowMemoryException(
- LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] { functionObject.getId(),
- sm }), sm);
-
- sendException(hasResult, msg, e.getMessage(), servConn, e);
- return;
- }
- /**
- * if cache is null, then either cache has not yet been created on
- * this node or it is a shutdown scenario.
- */
- DM dm = null;
- if (cache != null) {
- dm = cache.getDistributionManager();
- }
- if (groups != null && groups.length > 0) {
- executeFunctionOnGroups(function, args, groups, allMembers,
- functionObject, resultSender, ignoreFailedMembers);
- } else {
- executeFunctionaLocally(functionObject, context,
- (ServerToClientFunctionResultSender65)resultSender, dm, stats);
- }
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
+ if (functionObject.optimizeForWrite() && cache != null && hmm.getState()
+ .isCritical() && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+ Set<DistributedMember> sm = Collections.singleton((DistributedMember) cache.getMyId());
+ Exception e = new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(new Object[] {
+ functionObject.getId(), sm
+ }), sm);
- if (!functionObject.hasResult()) {
- writeReply(msg, servConn);
- }
+ sendException(hasResult, msg, e.getMessage(), servConn, e);
+ return;
}
- catch (FunctionException functionException) {
- stats.endFunctionExecutionWithException(functionObject.hasResult());
- throw functionException;
+ /**
+ * if cache is null, then either cache has not yet been created on
+ * this node or it is a shutdown scenario.
+ */
+ DM dm = null;
+ if (cache != null) {
+ dm = cache.getDistributionManager();
}
- catch (Exception exception) {
- stats.endFunctionExecutionWithException(functionObject.hasResult());
- throw new FunctionException(exception);
+ if (groups != null && groups.length > 0) {
+ executeFunctionOnGroups(function, args, groups, allMembers, functionObject, resultSender, ignoreFailedMembers);
+ } else {
+ executeFunctionaLocally(functionObject, context, (ServerToClientFunctionResultSender65) resultSender, dm, stats);
}
- finally {
- handShake.setClientReadTimeout(earlierClientReadTimeout);
+
+ if (!functionObject.hasResult()) {
+ writeReply(msg, servConn);
}
+ } catch (FunctionException functionException) {
+ stats.endFunctionExecutionWithException(functionObject.hasResult());
+ throw functionException;
+ } catch (Exception exception) {
+ stats.endFunctionExecutionWithException(functionObject.hasResult());
+ throw new FunctionException(exception);
+ } finally {
+ handShake.setClientReadTimeout(earlierClientReadTimeout);
}
- catch (IOException ioException) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioException);
- String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY
- .toLocalizedString();
- sendException(hasResult, msg, message, servConn, ioException);
- }
- catch (InternalFunctionInvocationTargetException internalfunctionException) {
- // Fix for #44709: User should not be aware of
- // InternalFunctionInvocationTargetException. No instance of
- // InternalFunctionInvocationTargetException is giving useful
- // information to user to take any corrective action hence logging
- // this at fine level logging
- // 1> When bucket is moved
- // 2> Incase of HA FucntionInvocationTargetException thrown. Since
- // it is HA, fucntion will be reexecuted on right node
- // 3> Multiple target nodes found for single hop operation
- // 4> in case of HA member departed
- if (logger.isDebugEnabled()) {
- logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] { function }), internalfunctionException);
- }
- final String message = internalfunctionException.getMessage();
- sendException(hasResult, msg, message, servConn, internalfunctionException);
- }
- catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
- final String message = e.getMessage();
- sendException(hasResult, msg, message, servConn, e);
+ } catch (IOException ioException) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioException);
+ String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString();
+ sendException(hasResult, msg, message, servConn, ioException);
+ } catch (InternalFunctionInvocationTargetException internalfunctionException) {
+ // Fix for #44709: User should not be aware of
+ // InternalFunctionInvocationTargetException. No instance of
+ // InternalFunctionInvocationTargetException is giving useful
+ // information to user to take any corrective action hence logging
+ // this at fine level logging
+ // 1> When bucket is moved
+ // 2> Incase of HA FucntionInvocationTargetException thrown. Since
+ // it is HA, fucntion will be reexecuted on right node
+ // 3> Multiple target nodes found for single hop operation
+ // 4> in case of HA member departed
+ if (logger.isDebugEnabled()) {
+ logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] {
+ function
+ }), internalfunctionException);
}
+ final String message = internalfunctionException.getMessage();
+ sendException(hasResult, msg, message, servConn, internalfunctionException);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
+ final String message = e.getMessage();
+ sendException(hasResult, msg, message, servConn, e);
}
}
@@ -313,9 +289,13 @@ public class ExecuteFunction66 extends BaseCommand {
return false;
}
- protected void executeFunctionOnGroups(Object function, Object args,
- String[] groups, boolean allMembers, Function functionObject,
- ServerToClientFunctionResultSender resultSender, boolean ignoreFailedMembers) {
+ protected void executeFunctionOnGroups(Object function,
+ Object args,
+ String[] groups,
+ boolean allMembers,
+ Function functionObject,
+ ServerToClientFunctionResultSender resultSender,
+ boolean ignoreFailedMembers) {
throw new InternalGemFireError();
}
@@ -324,19 +304,18 @@ public class ExecuteFunction66 extends BaseCommand {
}
private void executeFunctionaLocally(final Function fn,
- final FunctionContext cx,
- final ServerToClientFunctionResultSender65 sender, DM dm,
- final FunctionStats stats) throws IOException {
+ final FunctionContext cx,
+ final ServerToClientFunctionResultSender65 sender,
+ DM dm,
+ final FunctionStats stats) throws IOException {
long startExecution = stats.startTime();
stats.startFunctionExecution(fn.hasResult());
if (fn.hasResult()) {
fn.execute(cx);
- if (!((ServerToClientFunctionResultSender65)sender)
- .isLastResultReceived() && fn.hasResult()) {
- throw new FunctionException(
- LocalizedStrings.ExecuteFunction_THE_FUNCTION_0_DID_NOT_SENT_LAST_RESULT
- .toString(fn.getId()));
+ if (!((ServerToClientFunctionResultSender65) sender).isLastResultReceived() && fn.hasResult()) {
+ throw new FunctionException(LocalizedStrings.ExecuteFunction_THE_FUNCTION_0_DID_NOT_SENT_LAST_RESULT.toString(fn
+ .getId()));
}
} else {
/**
@@ -351,13 +330,9 @@ public class ExecuteFunction66 extends BaseCommand {
if (txState != null) {
cache = GemFireCacheImpl.getExisting("executing function");
cache.getTxManager().masqueradeAs(txState);
- if (cache.getLoggerI18n().warningEnabled()
- && !ASYNC_TX_WARNING_ISSUED) {
+ if (cache.getLoggerI18n().warningEnabled() && !ASYNC_TX_WARNING_ISSUED) {
ASYNC_TX_WARNING_ISSUED = true;
- cache
- .getLoggerI18n()
- .warning(
- LocalizedStrings.ExecuteFunction66_TRANSACTIONAL_FUNCTION_WITHOUT_RESULT);
+ cache.getLoggerI18n().warning(LocalizedStrings.ExecuteFunction66_TRANSACTIONAL_FUNCTION_WITHOUT_RESULT);
}
}
fn.execute(cx);
@@ -372,7 +347,9 @@ public class ExecuteFunction66 extends BaseCommand {
// 2> in case of HA member departed
stats.endFunctionExecutionWithException(fn.hasResult());
if (logger.isDebugEnabled()) {
- logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] { fn }), internalfunctionException);
+ logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] {
+ fn
+ }), internalfunctionException);
}
} catch (FunctionException functionException) {
stats.endFunctionExecutionWithException(fn.hasResult());
@@ -395,34 +372,28 @@ public class ExecuteFunction66 extends BaseCommand {
*/
execService.execute(functionExecution);
} else {
- final DistributionManager newDM = (DistributionManager)dm;
+ final DistributionManager newDM = (DistributionManager) dm;
newDM.getFunctionExcecutor().execute(functionExecution);
}
}
stats.endFunctionExecution(startExecution, fn.hasResult());
}
- private void sendException(byte hasResult, Message msg, String message,
- ServerConnection servConn, Throwable e) throws IOException {
+ private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, Throwable e)
+ throws IOException {
if (hasResult == 1) {
- writeFunctionResponseException(msg, MessageType.EXCEPTION, message,
- servConn, e);
- }
- else {
+ writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e);
+ } else {
writeException(msg, e, false, servConn);
}
servConn.setAsTrue(RESPONDED);
}
- private void sendError(byte hasResult, Message msg, String message,
- ServerConnection servConn) throws IOException {
+ private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) throws IOException {
if (hasResult == 1) {
- writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR,
- message, servConn);
- }
- else {
- writeErrorResponse(msg, MessageType.EXECUTE_FUNCTION_ERROR, message,
- servConn);
+ writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn);
+ } else {
+ writeErrorResponse(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn);
}
servConn.setAsTrue(RESPONDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/536c13bd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction.java
index 40f0ee5..6889e32 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction.java
@@ -18,7 +18,6 @@
package com.gemstone.gemfire.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
@@ -47,6 +46,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+import com.gemstone.gemfire.internal.security.GeodeSecurityUtil;
/**
* This is the base command which reads the parts for the
@@ -54,7 +54,7 @@ import com.gemstone.gemfire.internal.security.AuthorizeRequest;
* region.<br>
* If the hasResult byte is 1, then this command send back the result after the
* execution to the client else do not send the reply back to the client
- *
+ *
* @since GemFire 5.8LA
*/
public class ExecuteRegionFunction extends BaseCommand {
@@ -69,8 +69,7 @@ public class ExecuteRegionFunction extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
- throws IOException {
+ public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
String regionName = null;
Object function = null;
Object args = null;
@@ -92,7 +91,7 @@ public class ExecuteRegionFunction extends BaseCommand {
if (part != null) {
Object obj = part.getObject();
if (obj instanceof MemberMappedArgument) {
- memberMappedArg = (MemberMappedArgument)obj;
+ memberMappedArg = (MemberMappedArgument) obj;
}
}
filterSize = msg.getPart(5).getInt();
@@ -103,9 +102,8 @@ public class ExecuteRegionFunction extends BaseCommand {
filter.add(msg.getPart(partNumber + i).getStringOrObject());
}
}
-
- }
- catch (ClassNotFoundException exception) {
+
+ } catch (ClassNotFoundException exception) {
logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception);
if (hasResult == 1) {
writeChunkedException(msg, exception, false, servConn);
@@ -125,143 +123,126 @@ public class ExecuteRegionFunction extends BaseCommand {
sendError(hasResult, msg, message, servConn);
return;
}
- else {
- Region region = crHelper.getRegion(regionName);
- if (region == null) {
- String message =
- LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST
- .toLocalizedString(regionName);
- logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
- return;
- }
- HandShake handShake = (HandShake)servConn.getHandshake();
- int earlierClientReadTimeout = handShake.getClientReadTimeout();
- handShake.setClientReadTimeout(0);
- ServerToClientFunctionResultSender resultSender = null;
- Function functionObject = null;
- try {
- if (function instanceof String) {
- functionObject = FunctionService.getFunction((String)function);
- if (functionObject == null) {
- String message = LocalizedStrings.
- ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED
- .toLocalizedString(function);
- logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
- return;
- }
- }
- else {
- functionObject = (Function)function;
- }
- // check if the caller is authorized to do this operation on server
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
- final String functionName = functionObject.getId();
- final String regionPath = region.getFullPath();
- ExecuteFunctionOperationContext executeContext = null;
- if (authzRequest != null) {
- executeContext = authzRequest.executeFunctionAuthorize(functionName,
- regionPath, filter, args, functionObject.optimizeForWrite());
- }
-
- //Construct execution
- AbstractExecution execution = (AbstractExecution)FunctionService.onRegion(region);
- ChunkedMessage m = servConn.getFunctionResponseMessage();
- m.setTransactionId(msg.getTransactionId());
- resultSender = new ServerToClientFunctionResultSender(m,
- MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn,functionObject,executeContext);
-
-
- if (execution instanceof PartitionedRegionFunctionExecutor) {
- execution = new PartitionedRegionFunctionExecutor(
- (PartitionedRegion)region, filter, args, memberMappedArg,
- resultSender, null, false);
- }
- else {
- execution = new DistributedRegionFunctionExecutor(
- (DistributedRegion)region, filter, args, memberMappedArg,
- resultSender);
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Executing Function: {} on Server: {} with Execution: {}", functionObject.getId(), servConn, execution);
+
+ Region region = crHelper.getRegion(regionName);
+ if (region == null) {
+ String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST
+ .toLocalizedString(regionName);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ }
+
+ HandShake handShake = (HandShake) servConn.getHandshake();
+ int earlierClientReadTimeout = handShake.getClientReadTimeout();
+ handShake.setClientReadTimeout(0);
+ ServerToClientFunctionResultSender resultSender = null;
+ Function functionObject = null;
+ try {
+ if (function instanceof String) {
+ functionObject = FunctionService.getFunction((String) function);
+ if (functionObject == null) {
+ String message = LocalizedStrings.
+ ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED.toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
}
- if (hasResult == 1) {
- if (function instanceof String) {
- execution.execute((String)function).getResult();
- }
- else {
- execution.execute(functionObject).getResult();
- }
- }else {
- if (function instanceof String) {
- execution.execute((String)function);
- }
- else {
- execution.execute(functionObject);
- }
- }
- }
- catch (IOException ioe) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioe);
- final String message = LocalizedStrings.
- ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY
- .toLocalizedString();
- sendException(hasResult, msg, message, servConn,ioe);
+ } else {
+ functionObject = (Function) function;
}
- catch (InternalFunctionInvocationTargetException internalfunctionException) {
- // Fix for #44709: User should not be aware of
- // InternalFunctionInvocationTargetException. No instance of
- // InternalFunctionInvocationTargetException is giving useful
- // information to user to take any corrective action hence logging
- // this at fine level logging
- // 1> When bucket is moved
- // 2> Incase of HA FucntionInvocationTargetException thrown. Since
- // it is HA, fucntion will be reexecuted on right node
- // 3> Multiple target nodes found for single hop operation
- // 4> in case of HA member departed
- if (logger.isDebugEnabled()) {
- logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] { function }), internalfunctionException);
- }
- final String message = internalfunctionException.getMessage();
- sendException(hasResult, msg, message, servConn, internalfunctionException);
- }
- catch (FunctionException fe) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe);
- String message = fe.getMessage();
- sendException(hasResult, msg, message, servConn,fe);
+ GeodeSecurityUtil.authorizeFunctionExec(functionObject.getId());
+
+ // check if the caller is authorized to do this operation on server
+ AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ final String functionName = functionObject.getId();
+ final String regionPath = region.getFullPath();
+ ExecuteFunctionOperationContext executeContext = null;
+ if (authzRequest != null) {
+ executeContext = authzRequest.executeFunctionAuthorize(functionName, regionPath, filter, args, functionObject.optimizeForWrite());
}
- catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
- String message = e.getMessage();
- sendException(hasResult, msg, message, servConn,e);
+
+ //Construct execution
+ AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region);
+ ChunkedMessage m = servConn.getFunctionResponseMessage();
+ m.setTransactionId(msg.getTransactionId());
+ resultSender = new ServerToClientFunctionResultSender(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext);
+
+
+ if (execution instanceof PartitionedRegionFunctionExecutor) {
+ execution = new PartitionedRegionFunctionExecutor((PartitionedRegion) region, filter, args, memberMappedArg, resultSender, null, false);
+ } else {
+ execution = new DistributedRegionFunctionExecutor((DistributedRegion) region, filter, args, memberMappedArg, resultSender);
}
- finally{
- handShake.setClientReadTimeout(earlierClientReadTimeout);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing Function: {} on Server: {} with Execution: {}", functionObject.getId(), servConn, execution);
+ }
+ if (hasResult == 1) {
+ if (function instanceof String) {
+ execution.execute((String) function).getResult();
+ } else {
+ execution.execute(functionObject).getResult();
+ }
+ } else {
+ if (function instanceof String) {
+ execution.execute((String) function);
+ } else {
+ execution.execute(functionObject);
+ }
+ }
+ } catch (IOException ioe) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioe);
+ final String message = LocalizedStrings.
+ ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString();
+ sendException(hasResult, msg, message, servConn, ioe);
+ } catch (InternalFunctionInvocationTargetException internalfunctionException) {
+ // Fix for #44709: User should not be aware of
+ // InternalFunctionInvocationTargetException. No instance of
+ // InternalFunctionInvocationTargetException is giving useful
+ // information to user to take any corrective action hence logging
+ // this at fine level logging
+ // 1> When bucket is moved
+ // 2> Incase of HA FucntionInvocationTargetException thrown. Since
+ // it is HA, fucntion will be reexecuted on right node
+ // 3> Multiple target nodes found for single hop operation
+ // 4> in case of HA member departed
+ if (logger.isDebugEnabled()) {
+ logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] {
+ function
+ }), internalfunctionException);
}
+ final String message = internalfunctionException.getMessage();
+ sendException(hasResult, msg, message, servConn, internalfunctionException);
+ } catch (FunctionException fe) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe);
+ String message = fe.getMessage();
+
+ sendException(hasResult, msg, message, servConn, fe);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
+ String message = e.getMessage();
+ sendException(hasResult, msg, message, servConn, e);
+ } finally {
+ handShake.setClientReadTimeout(earlierClientReadTimeout);
}
}
- private void sendException(byte hasResult, Message msg, String message,
- ServerConnection servConn, Throwable e) throws IOException {
+ private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, Throwable e)
+ throws IOException {
synchronized (msg) {
if (hasResult == 1) {
- writeFunctionResponseException(msg, MessageType.EXCEPTION, message,
- servConn, e);
+ writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e);
servConn.setAsTrue(RESPONDED);
}
}
}
-
- private void sendError(byte hasResult, Message msg, String message,
- ServerConnection servConn) throws IOException {
+
+ private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) throws IOException {
synchronized (msg) {
if (hasResult == 1) {
- writeFunctionResponseError(msg,
- MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, servConn);
+ writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, servConn);
servConn.setAsTrue(RESPONDED);
}
}