You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2019/08/14 21:28:22 UTC

[geode] branch spike/slow-functions created (now b90ce6f)

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a change to branch spike/slow-functions
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at b90ce6f  Cache system property statically.

This branch includes the following new commits:

     new 6396f38  Cleanup
     new b90ce6f  Cache system property statically.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/02: Cleanup

Posted by jb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch spike/slow-functions
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 6396f3850099a6beb70ab6c4409b76fdb74b78d6
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Wed Aug 14 14:27:41 2019 -0700

    Cleanup
---
 .../internal/cache/control/HeapMemoryMonitor.java  |  7 +-
 .../internal/cache/execute/AbstractExecution.java  | 30 ++------
 .../cache/execute/MemberFunctionExecutor.java      | 87 ++++++++++------------
 3 files changed, 51 insertions(+), 73 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index 29f0025..109dd8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -675,7 +675,8 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     });
   }
 
-  protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+  protected Set<DistributedMember> getHeapCriticalMembersFrom(
+      Set<? extends DistributedMember> members) {
     Set<DistributedMember> criticalMembers = getCriticalMembers();
     criticalMembers.retainAll(members);
     return criticalMembers;
@@ -694,7 +695,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
     checkForLowMemory(function, targetMembers);
   }
 
-  public void checkForLowMemory(Function function, Set<DistributedMember> dest) {
+  public void checkForLowMemory(Function function, Set<? extends DistributedMember> dest) {
     LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
     if (exception != null) {
       throw exception;
@@ -708,7 +709,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor {
   }
 
   public LowMemoryException createLowMemoryIfNeeded(Function function,
-      Set<DistributedMember> memberSet) {
+      Set<? extends DistributedMember> memberSet) {
     if (function.optimizeForWrite()
         && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
       Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
index b5f55cc..bafd555 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
@@ -16,7 +16,6 @@
 package org.apache.geode.internal.cache.execute;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,7 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -68,8 +68,6 @@ public abstract class AbstractExecution implements InternalExecution {
 
   protected Set filter = new HashSet();
 
-  protected boolean hasRoutingObjects;
-
   protected volatile boolean isReExecute = false;
 
   volatile boolean isClientServerMode = false;
@@ -189,10 +187,6 @@ public abstract class AbstractExecution implements InternalExecution {
     this.isReExecute = isReExecute;
   }
 
-  public boolean isMemberMappedArgument() {
-    return isMemberMappedArgument;
-  }
-
   public Object getArgumentsForMember(String memberId) {
     if (!isMemberMappedArgument) {
       return args;
@@ -249,15 +243,6 @@ public abstract class AbstractExecution implements InternalExecution {
     return isFnSerializationReqd;
   }
 
-  public Collection<InternalDistributedMember> getExecutionNodes() {
-    return executionNodes;
-  }
-
-  public void setRequireExecutionNodes(ExecutionNodesListener listener) {
-    executionNodes = Collections.emptySet();
-    executionNodesListener = listener;
-  }
-
   public void setExecutionNodes(Set<InternalDistributedMember> nodes) {
     if (executionNodes != null) {
       executionNodes = nodes;
@@ -348,7 +333,7 @@ public abstract class AbstractExecution implements InternalExecution {
       } else {
         functionException = new FunctionException(fite);
       }
-      handleException(functionException, fn, cx, sender, dm);
+      handleException(functionException, fn, sender, dm);
     } catch (BucketMovedException bme) {
       FunctionException functionException;
       if (fn.isHA()) {
@@ -357,13 +342,13 @@ public abstract class AbstractExecution implements InternalExecution {
       } else {
         functionException = new FunctionException(bme);
       }
-      handleException(functionException, fn, cx, sender, dm);
+      handleException(functionException, fn, sender, dm);
     } catch (VirtualMachineError e) {
       SystemFailure.initiateFailure(e);
       throw e;
     } catch (Throwable t) {
       SystemFailure.checkFailure();
-      handleException(t, fn, cx, sender, dm);
+      handleException(t, fn, sender, dm);
     }
   }
 
@@ -446,7 +431,8 @@ public abstract class AbstractExecution implements InternalExecution {
    * @throws TransactionException if more than one nodes are targeted within a transaction
    * @throws LowMemoryException if the set contains a heap critical member
    */
-  public abstract void validateExecution(Function function, Set targetMembers);
+  public abstract void validateExecution(Function function,
+      Set<? extends DistributedMember> targetMembers);
 
   public LocalResultCollector<?, ?> getLocalResultCollector(Function function,
       final ResultCollector<?, ?> rc) {
@@ -482,7 +468,7 @@ public abstract class AbstractExecution implements InternalExecution {
   }
 
   private void handleException(Throwable functionException, final Function fn,
-      final FunctionContext cx, final ResultSender sender, DistributionManager dm) {
+      final ResultSender sender, DistributionManager dm) {
     FunctionStats stats = FunctionStats.getFunctionStats(fn.getId(), dm.getSystem());
 
     if (logger.isDebugEnabled()) {
@@ -515,7 +501,7 @@ public abstract class AbstractExecution implements InternalExecution {
    *
    * @return timeout in milliseconds.
    */
-  protected int getTimeoutMs() {
+  int getTimeoutMs() {
     return timeoutMs;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index a18c560..573c40c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.geode.cache.TransactionDataNotColocatedException;
@@ -38,67 +37,66 @@ import org.apache.geode.internal.cache.InternalCache;
 
 public class MemberFunctionExecutor extends AbstractExecution {
 
-  protected InternalDistributedSystem ds;
+  protected InternalDistributedSystem distributedSystem;
 
-  protected Set members;
+  protected Set<InternalDistributedMember> members;
 
   private ServerToClientFunctionResultSender sender;
 
-  public MemberFunctionExecutor(DistributedSystem s) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = this.ds.getDistributionManager().getNormalDistributionManagerIds();
+  MemberFunctionExecutor(DistributedSystem distributedSystem) {
+    this(distributedSystem, ((InternalDistributedSystem) distributedSystem).getDistributionManager()
+        .getNormalDistributionManagerIds());
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, DistributedMember m) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = Collections.singleton(m);
+  MemberFunctionExecutor(DistributedSystem distributedSystem, DistributedMember distributedMember) {
+    this(distributedSystem, Collections.singleton((InternalDistributedMember) distributedMember));
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, Set m) {
-    this.ds = (InternalDistributedSystem) s;
-    this.members = m;
+  MemberFunctionExecutor(DistributedSystem distributedSystem,
+      Set<? extends DistributedMember> members) {
+    this.distributedSystem = (InternalDistributedSystem) distributedSystem;
+    this.members = (Set<InternalDistributedMember>) members;
   }
 
-  public MemberFunctionExecutor(DistributedSystem s, Set m,
+  public MemberFunctionExecutor(DistributedSystem distributedSystem,
+      Set<? extends DistributedMember> members,
       ServerToClientFunctionResultSender sender) {
-    this(s, m);
+    this(distributedSystem, members);
     this.sender = sender;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memFunctionExecutor) {
     super(memFunctionExecutor);
-    this.ds = memFunctionExecutor.ds;
-    this.members = new HashSet();
-    this.members.addAll(memFunctionExecutor.members);
-    this.sender = memFunctionExecutor.sender;
+    distributedSystem = memFunctionExecutor.distributedSystem;
+    members = new HashSet<>(memFunctionExecutor.members);
+    sender = memFunctionExecutor.sender;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
       MemberMappedArgument argument) {
     this(memberFunctionExecutor);
 
-    this.memberMappedArg = argument;
-    this.isMemberMappedArgument = true;
+    memberMappedArg = argument;
+    isMemberMappedArgument = true;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor,
       ResultCollector rs) {
     this(memberFunctionExecutor);
 
-    this.rc = rs;
+    rc = rs;
   }
 
   private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, Object arguments) {
     this(memberFunctionExecutor);
 
-    this.args = arguments;
+    args = arguments;
   }
 
-  @SuppressWarnings("unchecked")
   private ResultCollector executeFunction(final Function function,
       ResultCollector resultCollector) {
-    final DistributionManager dm = this.ds.getDistributionManager();
-    final Set dest = new HashSet(this.members);
+    final DistributionManager dm = distributedSystem.getDistributionManager();
+    final Set<InternalDistributedMember> dest = new HashSet<>(members);
     if (dest.isEmpty()) {
       throw new FunctionException(
           String.format("No member found for executing function : %s.",
@@ -108,7 +106,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
     setExecutionNodes(dest);
 
     final InternalDistributedMember localVM =
-        this.ds.getDistributionManager().getDistributionManagerId();
+        distributedSystem.getDistributionManager().getDistributionManagerId();
     final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector);
     boolean remoteOnly = false;
     boolean localOnly = false;
@@ -129,7 +127,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
       boolean isTx = false;
       InternalCache cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
-        isTx = cache.getTxManager().getTXState() == null ? false : true;
+        isTx = cache.getTxManager().getTXState() != null;
       }
       final FunctionContext context = new FunctionContextImpl(cache, function.getId(),
           getArgumentsForMember(localVM.getId()), resultSender);
@@ -137,25 +135,23 @@ public class MemberFunctionExecutor extends AbstractExecution {
     }
 
     if (!dest.isEmpty()) {
-      HashMap<InternalDistributedMember, Object> memberArgs =
-          new HashMap<InternalDistributedMember, Object>();
-      Iterator<DistributedMember> iter = dest.iterator();
-      while (iter.hasNext()) {
-        InternalDistributedMember recip = (InternalDistributedMember) iter.next();
-        memberArgs.put(recip, getArgumentsForMember(recip.getId()));
+      HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>();
+      for (InternalDistributedMember distributedMember : dest) {
+        memberArgs.put(distributedMember, getArgumentsForMember(distributedMember.getId()));
       }
       Assert.assertTrue(memberArgs.size() == dest.size());
-      MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC,
-          function, memberArgs, dest, resultSender);
+      MemberFunctionResultWaiter resultReceiver =
+          new MemberFunctionResultWaiter(distributedSystem, localRC,
+              function, memberArgs, dest, resultSender);
 
-      ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this);
-      return reply;
+      return resultReceiver.getFunctionResultFrom(dest, function, this);
     }
     return localRC;
   }
 
   @Override
-  public void validateExecution(final Function function, final Set dest) {
+  public void validateExecution(final Function function,
+      final Set<? extends DistributedMember> dest) {
     final InternalCache cache = GemFireCacheImpl.getInstance();
     if (cache == null) {
       return;
@@ -170,7 +166,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
           throw new UnsupportedOperationException(
               "Client function execution on members is not supported with transaction");
         }
-        DistributedMember funcTarget = (DistributedMember) dest.iterator().next();
+        DistributedMember funcTarget = dest.iterator().next();
         DistributedMember target = cache.getTxManager().getTXState().getTarget();
         if (target == null) {
           cache.getTxManager().getTXState().setTarget(funcTarget);
@@ -249,25 +245,20 @@ public class MemberFunctionExecutor extends AbstractExecution {
   }
 
   @Override
-  public boolean isMemberMappedArgument() {
-    return this.isMemberMappedArgument;
-  }
-
-  @Override
   public Object getArgumentsForMember(String memberId) {
     if (!isMemberMappedArgument) {
-      return this.args;
+      return args;
     } else {
-      return this.memberMappedArg.getArgumentsForMember(memberId);
+      return memberMappedArg.getArgumentsForMember(memberId);
     }
   }
 
   @Override
   public MemberMappedArgument getMemberMappedArgument() {
-    return this.memberMappedArg;
+    return memberMappedArg;
   }
 
   public ServerToClientFunctionResultSender getServerResultSender() {
-    return this.sender;
+    return sender;
   }
 }


[geode] 02/02: Cache system property statically.

Posted by jb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch spike/slow-functions
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b90ce6f1e328ca8b551eace7b7491a02f2565242
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Wed Aug 14 14:28:03 2019 -0700

    Cache system property statically.
---
 .../apache/geode/internal/cache/execute/AbstractExecution.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
index bafd555..d98ec8a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
@@ -51,12 +51,13 @@ import org.apache.geode.internal.logging.LogService;
  *
  */
 public abstract class AbstractExecution implements InternalExecution {
+  private static final Logger logger = LogService.getLogger();
 
   public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
   private static final String CLIENT_FUNCTION_TIMEOUT_SYSTEM_PROPERTY =
       DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT";
-
-  private static final Logger logger = LogService.getLogger();
+  private static final Integer timeoutMsSystemProperty =
+      Integer.getInteger(CLIENT_FUNCTION_TIMEOUT_SYSTEM_PROPERTY, DEFAULT_CLIENT_FUNCTION_TIMEOUT);
 
   boolean isMemberMappedArgument;
 
@@ -158,9 +159,8 @@ public abstract class AbstractExecution implements InternalExecution {
   }
 
   protected AbstractExecution() {
-    final int timeoutMs = Integer.getInteger(CLIENT_FUNCTION_TIMEOUT_SYSTEM_PROPERTY,
-        DEFAULT_CLIENT_FUNCTION_TIMEOUT);
-    this.timeoutMs = timeoutMs >= 0 ? timeoutMs : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
+    timeoutMs =
+        timeoutMsSystemProperty >= 0 ? timeoutMsSystemProperty : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
   }
 
   protected AbstractExecution(AbstractExecution ae) {