You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2019/08/14 21:28:23 UTC
[geode] 01/02: Cleanup
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch spike/slow-functions
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6396f3850099a6beb70ab6c4409b76fdb74b78d6
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Wed Aug 14 14:27:41 2019 -0700
Cleanup
---
.../internal/cache/control/HeapMemoryMonitor.java | 7 +-
.../internal/cache/execute/AbstractExecution.java | 30 ++------
.../cache/execute/MemberFunctionExecutor.java | 87 ++++++++++------------
3 files changed, 51 insertions(+), 73 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index 29f0025..109dd8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -675,7 +675,8 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
});
}
- protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+ protected Set<DistributedMember> getHeapCriticalMembersFrom(
+ Set<? extends DistributedMember> members) {
Set<DistributedMember> criticalMembers = getCriticalMembers();
criticalMembers.retainAll(members);
return criticalMembers;
@@ -694,7 +695,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
checkForLowMemory(function, targetMembers);
}
- public void checkForLowMemory(Function function, Set<DistributedMember> dest) {
+ public void checkForLowMemory(Function function, Set<? extends DistributedMember> dest) {
LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
if (exception != null) {
throw exception;
@@ -708,7 +709,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
}
public LowMemoryException createLowMemoryIfNeeded(Function function,
- Set<DistributedMember> memberSet) {
+ Set<? extends DistributedMember> memberSet) {
if (function.optimizeForWrite()
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
index b5f55cc..bafd555 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
@@ -16,7 +16,6 @@
package org.apache.geode.internal.cache.execute;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,7 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
@@ -68,8 +68,6 @@ public abstract class AbstractExecution implements InternalExecution {
protected Set filter = new HashSet();
- protected boolean hasRoutingObjects;
-
protected volatile boolean isReExecute = false;
volatile boolean isClientServerMode = false;
@@ -189,10 +187,6 @@ public abstract class AbstractExecution implements InternalExecution {
this.isReExecute = isReExecute;
}
- public boolean isMemberMappedArgument() {
- return isMemberMappedArgument;
- }
-
public Object getArgumentsForMember(String memberId) {
if (!isMemberMappedArgument) {
return args;
@@ -249,15 +243,6 @@ public abstract class AbstractExecution implements InternalExecution {
return isFnSerializationReqd;
}
- public Collection<InternalDistributedMember> getExecutionNodes() {
- return executionNodes;
- }
-
- public void setRequireExecutionNodes(ExecutionNodesListener listener) {
- executionNodes = Collections.emptySet();
- executionNodesListener = listener;
- }
-
public void setExecutionNodes(Set<InternalDistributedMember> nodes) {
if (executionNodes != null) {
executionNodes = nodes;
@@ -348,7 +333,7 @@ public abstract class AbstractExecution implements InternalExecution {
} else {
functionException = new FunctionException(fite);
}
- handleException(functionException, fn, cx, sender, dm);
+ handleException(functionException, fn, sender, dm);
} catch (BucketMovedException bme) {
FunctionException functionException;
if (fn.isHA()) {
@@ -357,13 +342,13 @@ public abstract class AbstractExecution implements InternalExecution {
} else {
functionException = new FunctionException(bme);
}
- handleException(functionException, fn, cx, sender, dm);
+ handleException(functionException, fn, sender, dm);
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
SystemFailure.checkFailure();
- handleException(t, fn, cx, sender, dm);
+ handleException(t, fn, sender, dm);
}
}
@@ -446,7 +431,8 @@ public abstract class AbstractExecution implements InternalExecution {
* @throws TransactionException if more than one nodes are targeted within a transaction
* @throws LowMemoryException if the set contains a heap critical member
*/
- public abstract void validateExecution(Function function, Set targetMembers);
+ public abstract void validateExecution(Function function,
+ Set<? extends DistributedMember> targetMembers);
public LocalResultCollector<?, ?> getLocalResultCollector(Function function,
final ResultCollector<?, ?> rc) {
@@ -482,7 +468,7 @@ public abstract class AbstractExecution implements InternalExecution {
}
private void handleException(Throwable functionException, final Function fn,
- final FunctionContext cx, final ResultSender sender, DistributionManager dm) {
+ final ResultSender sender, DistributionManager dm) {
FunctionStats stats = FunctionStats.getFunctionStats(fn.getId(), dm.getSystem());
if (logger.isDebugEnabled()) {
@@ -515,7 +501,7 @@ public abstract class AbstractExecution implements InternalExecution {
*
* @return timeout in milliseconds.
*/
- protected int getTimeoutMs() {
+ int getTimeoutMs() {
return timeoutMs;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index a18c560..573c40c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import org.apache.geode.cache.TransactionDataNotColocatedException;
@@ -38,67 +37,66 @@ import org.apache.geode.internal.cache.InternalCache;
public class MemberFunctionExecutor extends AbstractExecution {
- protected InternalDistributedSystem ds;
+ protected InternalDistributedSystem distributedSystem;
- protected Set members;
+ protected Set<InternalDistributedMember> members;
private ServerToClientFunctionResultSender sender;
- public MemberFunctionExecutor(DistributedSystem s) {
- this.ds = (InternalDistributedSystem) s;
- this.members = this.ds.getDistributionManager().getNormalDistributionManagerIds();
+ MemberFunctionExecutor(DistributedSystem distributedSystem) {
+ this(distributedSystem, ((InternalDistributedSystem) distributedSystem).getDistributionManager()
+ .getNormalDistributionManagerIds());
}
- public MemberFunctionExecutor(DistributedSystem s, DistributedMember m) {
- this.ds = (InternalDistributedSystem) s;
- this.members = Collections.singleton(m);
+ MemberFunctionExecutor(DistributedSystem distributedSystem, DistributedMember distributedMember) {
+ this(distributedSystem, Collections.singleton((InternalDistributedMember) distributedMember));
}
- public MemberFunctionExecutor(DistributedSystem s, Set m) {
- this.ds = (InternalDistributedSystem) s;
- this.members = m;
+ MemberFunctionExecutor(DistributedSystem distributedSystem,
+ Set<? extends DistributedMember> members) {
+ this.distributedSystem = (InternalDistributedSystem) distributedSystem;
+ this.members = (Set<InternalDistributedMember>) members;
}
- public MemberFunctionExecutor(DistributedSystem s, Set m,
+ public MemberFunctionExecutor(DistributedSystem distributedSystem,
+ Set<? extends DistributedMember> members,
ServerToClientFunctionResultSender sender) {
- this(s, m);
+ this(distributedSystem, members);
this.sender = sender;
}
private MemberFunctionExecutor(MemberFunctionExecutor memFunctionExecutor) {
super(memFunctionExecutor);
- this.ds = memFunctionExecutor.ds;
- this.members = new HashSet();
- this.members.addAll(memFunctionExecutor.members);
- this.sender = memFunctionExecutor.sender;
+ distributedSystem = memFunctionExecutor.distributedSystem;
+ members = new HashSet<>(memFunctionExecutor.members);
+ sender = memFunctionExecutor.sender;
}
private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
MemberMappedArgument argument) {
this(memberFunctionExecutor);
- this.memberMappedArg = argument;
- this.isMemberMappedArgument = true;
+ memberMappedArg = argument;
+ isMemberMappedArgument = true;
}
private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
ResultCollector rs) {
this(memberFunctionExecutor);
- this.rc = rs;
+ rc = rs;
}
private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, Object arguments) {
this(memberFunctionExecutor);
- this.args = arguments;
+ args = arguments;
}
- @SuppressWarnings("unchecked")
private ResultCollector executeFunction(final Function function,
ResultCollector resultCollector) {
- final DistributionManager dm = this.ds.getDistributionManager();
- final Set dest = new HashSet(this.members);
+ final DistributionManager dm = distributedSystem.getDistributionManager();
+ final Set<InternalDistributedMember> dest = new HashSet<>(members);
if (dest.isEmpty()) {
throw new FunctionException(
String.format("No member found for executing function : %s.",
@@ -108,7 +106,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
setExecutionNodes(dest);
final InternalDistributedMember localVM =
- this.ds.getDistributionManager().getDistributionManagerId();
+ distributedSystem.getDistributionManager().getDistributionManagerId();
final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector);
boolean remoteOnly = false;
boolean localOnly = false;
@@ -129,7 +127,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
boolean isTx = false;
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
- isTx = cache.getTxManager().getTXState() == null ? false : true;
+ isTx = cache.getTxManager().getTXState() != null;
}
final FunctionContext context = new FunctionContextImpl(cache, function.getId(),
getArgumentsForMember(localVM.getId()), resultSender);
@@ -137,25 +135,23 @@ public class MemberFunctionExecutor extends AbstractExecution {
}
if (!dest.isEmpty()) {
- HashMap<InternalDistributedMember, Object> memberArgs =
- new HashMap<InternalDistributedMember, Object>();
- Iterator<DistributedMember> iter = dest.iterator();
- while (iter.hasNext()) {
- InternalDistributedMember recip = (InternalDistributedMember) iter.next();
- memberArgs.put(recip, getArgumentsForMember(recip.getId()));
+ HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>();
+ for (InternalDistributedMember distributedMember : dest) {
+ memberArgs.put(distributedMember, getArgumentsForMember(distributedMember.getId()));
}
Assert.assertTrue(memberArgs.size() == dest.size());
- MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC,
- function, memberArgs, dest, resultSender);
+ MemberFunctionResultWaiter resultReceiver =
+ new MemberFunctionResultWaiter(distributedSystem, localRC,
+ function, memberArgs, dest, resultSender);
- ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this);
- return reply;
+ return resultReceiver.getFunctionResultFrom(dest, function, this);
}
return localRC;
}
@Override
- public void validateExecution(final Function function, final Set dest) {
+ public void validateExecution(final Function function,
+ final Set<? extends DistributedMember> dest) {
final InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
return;
@@ -170,7 +166,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
throw new UnsupportedOperationException(
"Client function execution on members is not supported with transaction");
}
- DistributedMember funcTarget = (DistributedMember) dest.iterator().next();
+ DistributedMember funcTarget = dest.iterator().next();
DistributedMember target = cache.getTxManager().getTXState().getTarget();
if (target == null) {
cache.getTxManager().getTXState().setTarget(funcTarget);
@@ -249,25 +245,20 @@ public class MemberFunctionExecutor extends AbstractExecution {
}
@Override
- public boolean isMemberMappedArgument() {
- return this.isMemberMappedArgument;
- }
-
- @Override
public Object getArgumentsForMember(String memberId) {
if (!isMemberMappedArgument) {
- return this.args;
+ return args;
} else {
- return this.memberMappedArg.getArgumentsForMember(memberId);
+ return memberMappedArg.getArgumentsForMember(memberId);
}
}
@Override
public MemberMappedArgument getMemberMappedArgument() {
- return this.memberMappedArg;
+ return memberMappedArg;
}
public ServerToClientFunctionResultSender getServerResultSender() {
- return this.sender;
+ return sender;
}
}