You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2019/08/14 21:28:23 UTC

[geode] 01/02: Cleanup

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch spike/slow-functions
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 6396f3850099a6beb70ab6c4409b76fdb74b78d6
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Wed Aug 14 14:27:41 2019 -0700

    Cleanup
---
 .../internal/cache/control/HeapMemoryMonitor.java  |  7 +-
 .../internal/cache/execute/AbstractExecution.java  | 30 ++------
 .../cache/execute/MemberFunctionExecutor.java      | 87 ++++++++++------------
 3 files changed, 51 insertions(+), 73 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index 29f0025..109dd8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -675,7 +675,8 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     });
   }
 
-  protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+  protected Set<DistributedMember> getHeapCriticalMembersFrom(
+      Set<? extends DistributedMember> members) {
     Set<DistributedMember> criticalMembers = getCriticalMembers();
     criticalMembers.retainAll(members);
     return criticalMembers;
@@ -694,7 +695,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     checkForLowMemory(function, targetMembers);
   }
 
-  public void checkForLowMemory(Function function, Set<DistributedMember> dest) {
+  public void checkForLowMemory(Function function, Set<? extends DistributedMember> dest) {
     LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
     if (exception != null) {
       throw exception;
@@ -708,7 +709,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
   }
 
   public LowMemoryException createLowMemoryIfNeeded(Function function,
-      Set<DistributedMember> memberSet) {
+      Set<? extends DistributedMember> memberSet) {
     if (function.optimizeForWrite()
         && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
       Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
index b5f55cc..bafd555 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
@@ -16,7 +16,6 @@
 package org.apache.geode.internal.cache.execute;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,7 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -68,8 +68,6 @@ public abstract class AbstractExecution implements InternalExecution {
 
   protected Set filter = new HashSet();
 
-  protected boolean hasRoutingObjects;
-
   protected volatile boolean isReExecute = false;
 
   volatile boolean isClientServerMode = false;
@@ -189,10 +187,6 @@ public abstract class AbstractExecution implements InternalExecution {
     this.isReExecute = isReExecute;
   }
 
-  public boolean isMemberMappedArgument() {
-    return isMemberMappedArgument;
-  }
-
   public Object getArgumentsForMember(String memberId) {
     if (!isMemberMappedArgument) {
       return args;
@@ -249,15 +243,6 @@ public abstract class AbstractExecution implements InternalExecution {
     return isFnSerializationReqd;
   }
 
-  public Collection<InternalDistributedMember> getExecutionNodes() {
-    return executionNodes;
-  }
-
-  public void setRequireExecutionNodes(ExecutionNodesListener listener) {
-    executionNodes = Collections.emptySet();
-    executionNodesListener = listener;
-  }
-
   public void setExecutionNodes(Set<InternalDistributedMember> nodes) {
     if (executionNodes != null) {
       executionNodes = nodes;
@@ -348,7 +333,7 @@ public abstract class AbstractExecution implements InternalExecution {
       } else {
         functionException = new FunctionException(fite);
       }
-      handleException(functionException, fn, cx, sender, dm);
+      handleException(functionException, fn, sender, dm);
     } catch (BucketMovedException bme) {
       FunctionException functionException;
       if (fn.isHA()) {
@@ -357,13 +342,13 @@ public abstract class AbstractExecution implements InternalExecution {
       } else {
         functionException = new FunctionException(bme);
       }
-      handleException(functionException, fn, cx, sender, dm);
+      handleException(functionException, fn, sender, dm);
     } catch (VirtualMachineError e) {
       SystemFailure.initiateFailure(e);
       throw e;
     } catch (Throwable t) {
       SystemFailure.checkFailure();
-      handleException(t, fn, cx, sender, dm);
+      handleException(t, fn, sender, dm);
     }
   }
 
@@ -446,7 +431,8 @@ public abstract class AbstractExecution implements InternalExecution {
    * @throws TransactionException if more than one nodes are targeted within a transaction
    * @throws LowMemoryException if the set contains a heap critical member
    */
-  public abstract void validateExecution(Function function, Set targetMembers);
+  public abstract void validateExecution(Function function,
+      Set<? extends DistributedMember> targetMembers);
 
   public LocalResultCollector<?, ?> getLocalResultCollector(Function function,
       final ResultCollector<?, ?> rc) {
@@ -482,7 +468,7 @@ public abstract class AbstractExecution implements InternalExecution {
   }
 
   private void handleException(Throwable functionException, final Function fn,
-      final FunctionContext cx, final ResultSender sender, DistributionManager dm) {
+      final ResultSender sender, DistributionManager dm) {
     FunctionStats stats = FunctionStats.getFunctionStats(fn.getId(), dm.getSystem());
 
     if (logger.isDebugEnabled()) {
@@ -515,7 +501,7 @@ public abstract class AbstractExecution implements InternalExecution {
    *
    * @return timeout in milliseconds.
    */
-  protected int getTimeoutMs() {
+  int getTimeoutMs() {
     return timeoutMs;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index a18c560..573c40c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.geode.cache.TransactionDataNotColocatedException;
@@ -38,67 +37,66 @@ import org.apache.geode.internal.cache.InternalCache;
 
 public class MemberFunctionExecutor extends AbstractExecution {
 
-  protected InternalDistributedSystem ds;
+  protected InternalDistributedSystem distributedSystem;
 
-  protected Set members;
+  protected Set<InternalDistributedMember> members;
 
   private ServerToClientFunctionResultSender sender;
 
-  public MemberFunctionExecutor(DistributedSystem s) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = this.ds.getDistributionManager().getNormalDistributionManagerIds();
+  MemberFunctionExecutor(DistributedSystem distributedSystem) {
+    this(distributedSystem, ((InternalDistributedSystem) distributedSystem).getDistributionManager()
+        .getNormalDistributionManagerIds());
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, DistributedMember m) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = Collections.singleton(m);
+  MemberFunctionExecutor(DistributedSystem distributedSystem, DistributedMember distributedMember) {
+    this(distributedSystem, Collections.singleton((InternalDistributedMember) distributedMember));
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, Set m) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = m;
+  MemberFunctionExecutor(DistributedSystem distributedSystem,
+      Set<? extends DistributedMember> members) {
+    this.distributedSystem = (InternalDistributedSystem) distributedSystem;
+    this.members = (Set<InternalDistributedMember>) members;
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, Set m,
+  public MemberFunctionExecutor(DistributedSystem distributedSystem,
+      Set<? extends DistributedMember> members,
       ServerToClientFunctionResultSender sender) {
-    this(s, m);
+    this(distributedSystem, members);
     this.sender = sender;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memFunctionExecutor) {
     super(memFunctionExecutor);
-    this.ds = memFunctionExecutor.ds;
-    this.members = new HashSet();
-    this.members.addAll(memFunctionExecutor.members);
-    this.sender = memFunctionExecutor.sender;
+    distributedSystem = memFunctionExecutor.distributedSystem;
+    members = new HashSet<>(memFunctionExecutor.members);
+    sender = memFunctionExecutor.sender;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
       MemberMappedArgument argument) {
     this(memberFunctionExecutor);
 
-    this.memberMappedArg = argument;
-    this.isMemberMappedArgument = true;
+    memberMappedArg = argument;
+    isMemberMappedArgument = true;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
       ResultCollector rs) {
     this(memberFunctionExecutor);
 
-    this.rc = rs;
+    rc = rs;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, Object arguments) {
     this(memberFunctionExecutor);
 
-    this.args = arguments;
+    args = arguments;
   }
 
-  @SuppressWarnings("unchecked")
   private ResultCollector executeFunction(final Function function,
       ResultCollector resultCollector) {
-    final DistributionManager dm = this.ds.getDistributionManager();
-    final Set dest = new HashSet(this.members);
+    final DistributionManager dm = distributedSystem.getDistributionManager();
+    final Set<InternalDistributedMember> dest = new HashSet<>(members);
     if (dest.isEmpty()) {
       throw new FunctionException(
           String.format("No member found for executing function : %s.",
@@ -108,7 +106,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
     setExecutionNodes(dest);
 
     final InternalDistributedMember localVM =
-        this.ds.getDistributionManager().getDistributionManagerId();
+        distributedSystem.getDistributionManager().getDistributionManagerId();
     final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector);
     boolean remoteOnly = false;
     boolean localOnly = false;
@@ -129,7 +127,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
       boolean isTx = false;
       InternalCache cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
-        isTx = cache.getTxManager().getTXState() == null ? false : true;
+        isTx = cache.getTxManager().getTXState() != null;
       }
       final FunctionContext context = new FunctionContextImpl(cache, function.getId(),
           getArgumentsForMember(localVM.getId()), resultSender);
@@ -137,25 +135,23 @@ public class MemberFunctionExecutor extends AbstractExecution {
     }
 
     if (!dest.isEmpty()) {
-      HashMap<InternalDistributedMember, Object> memberArgs =
-          new HashMap<InternalDistributedMember, Object>();
-      Iterator<DistributedMember> iter = dest.iterator();
-      while (iter.hasNext()) {
-        InternalDistributedMember recip = (InternalDistributedMember) iter.next();
-        memberArgs.put(recip, getArgumentsForMember(recip.getId()));
+      HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>();
+      for (InternalDistributedMember distributedMember : dest) {
+        memberArgs.put(distributedMember, getArgumentsForMember(distributedMember.getId()));
       }
       Assert.assertTrue(memberArgs.size() == dest.size());
-      MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC,
-          function, memberArgs, dest, resultSender);
+      MemberFunctionResultWaiter resultReceiver =
+          new MemberFunctionResultWaiter(distributedSystem, localRC,
+              function, memberArgs, dest, resultSender);
 
-      ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this);
-      return reply;
+      return resultReceiver.getFunctionResultFrom(dest, function, this);
     }
     return localRC;
   }
 
   @Override
-  public void validateExecution(final Function function, final Set dest) {
+  public void validateExecution(final Function function,
+      final Set<? extends DistributedMember> dest) {
     final InternalCache cache = GemFireCacheImpl.getInstance();
     if (cache == null) {
       return;
@@ -170,7 +166,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
           throw new UnsupportedOperationException(
               "Client function execution on members is not supported with transaction");
         }
-        DistributedMember funcTarget = (DistributedMember) dest.iterator().next();
+        DistributedMember funcTarget = dest.iterator().next();
         DistributedMember target = cache.getTxManager().getTXState().getTarget();
         if (target == null) {
           cache.getTxManager().getTXState().setTarget(funcTarget);
@@ -249,25 +245,20 @@ public class MemberFunctionExecutor extends AbstractExecution {
   }
 
   @Override
-  public boolean isMemberMappedArgument() {
-    return this.isMemberMappedArgument;
-  }
-
-  @Override
   public Object getArgumentsForMember(String memberId) {
     if (!isMemberMappedArgument) {
-      return this.args;
+      return args;
     } else {
-      return this.memberMappedArg.getArgumentsForMember(memberId);
+      return memberMappedArg.getArgumentsForMember(memberId);
     }
   }
 
   @Override
   public MemberMappedArgument getMemberMappedArgument() {
-    return this.memberMappedArg;
+    return memberMappedArg;
   }
 
   public ServerToClientFunctionResultSender getServerResultSender() {
-    return this.sender;
+    return sender;
   }
 }