You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by js...@apache.org on 2017/05/15 21:57:00 UTC

geode git commit: GEODE-2912: Hot deploy for functions in deployed Jars

Repository: geode
Updated Branches:
  refs/heads/develop 614031725 -> f18e1d294


GEODE-2912: Hot deploy for functions in deployed Jars


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f18e1d29
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f18e1d29
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f18e1d29

Branch: refs/heads/develop
Commit: f18e1d29473e357ddbc8891acb126ce3c1970606
Parents: 6140317
Author: Jared Stewart <js...@pivotal.io>
Authored: Thu May 11 14:16:59 2017 -0700
Committer: Jared Stewart <js...@pivotal.io>
Committed: Mon May 15 14:56:02 2017 -0700

----------------------------------------------------------------------
 .../org/apache/geode/internal/DeployedJar.java  | 72 +++++++++++++-------
 .../org/apache/geode/internal/JarDeployer.java  | 22 ++++--
 .../DeployCommandRedeployDUnitTest.java         | 67 ++++++++++++++++++
 3 files changed, 129 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
index 820eb85..037ef9e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
@@ -14,6 +14,19 @@
  */
 package org.apache.geode.internal;
 
+import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
+import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.pdx.internal.TypeRegistry;
+import org.apache.logging.log4j.Logger;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -34,22 +47,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Predicate;
 import java.util.jar.JarEntry;
 import java.util.jar.JarInputStream;
 import java.util.regex.Pattern;
-
-import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
-import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Declarable;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.pdx.internal.TypeRegistry;
+import java.util.stream.Stream;
 
 /**
  * ClassLoader for a single JAR file.
@@ -156,15 +158,10 @@ public class DeployedJar {
   }
 
   /**
-   * Scan the JAR file and attempt to load all classes and register any function classes found.
+   * Scan the JAR file and attempt to register any function classes found.
    */
-  // This method will process the contents of the JAR file as stored in this.jarByteContent
-  // instead of reading from the original JAR file. This is done because we can't open up
-  // the original file and then close it without releasing the shared lock that was obtained
-  // in the constructor. Once this method is finished, all classes will have been loaded and
-  // there will no longer be a need to hang on to the original contents so they will be
-  // discarded.
-  synchronized void loadClassesAndRegisterFunctions() throws ClassNotFoundException {
+
+  public synchronized void registerFunctions() throws ClassNotFoundException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug("Registering functions with DeployedJar: {}", this);
@@ -227,12 +224,28 @@ public class DeployedJar {
     }
   }
 
-  synchronized void cleanUp() {
-    for (Function function : this.registeredFunctions) {
-      FunctionService.unregisterFunction(function.getId());
+  /**
+   * Unregisters all functions from this jar if it was undeployed (i.e. newVersion == null), or all
+   * functions not present in the new version if it was redeployed.
+   *
+   * @param newVersion The new version of this jar that was deployed, or null if this jar was
+   *        undeployed.
+   */
+  protected synchronized void cleanUp(DeployedJar newVersion) {
+    Stream<String> oldFunctions = this.registeredFunctions.stream().map(Function::getId);
+
+    Stream<String> removedFunctions;
+    if (newVersion == null) {
+      removedFunctions = oldFunctions;
+    } else {
+      Predicate<String> isRemoved =
+          (String oldFunctionId) -> !newVersion.hasFunctionWithId(oldFunctionId);
+
+      removedFunctions = oldFunctions.filter(isRemoved);
     }
-    this.registeredFunctions.clear();
 
+    removedFunctions.forEach(FunctionService::unregisterFunction);
+    this.registeredFunctions.clear();
     try {
       TypeRegistry typeRegistry = ((InternalCache) CacheFactory.getAnyInstance()).getPdxRegistry();
       if (typeRegistry != null) {
@@ -396,6 +409,15 @@ public class DeployedJar {
     return null;
   }
 
+  private boolean hasFunctionWithId(String id) {
+    if (CollectionUtils.isEmpty(this.registeredFunctions)) {
+      return false;
+    }
+
+    return this.registeredFunctions.stream().map(Function::getId)
+        .anyMatch(functionId -> functionId.equals(id));
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;

http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java b/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
index df3f10b..c21fb9d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
@@ -33,6 +33,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -459,23 +460,30 @@ public class JarDeployer implements Serializable {
       throws ClassNotFoundException {
     lock.lock();
     try {
+      Map<DeployedJar, DeployedJar> newVersionToOldVersion = new HashMap<>();
+
       for (DeployedJar deployedJar : deployedJars) {
         if (deployedJar != null) {
           logger.info("Registering new version of jar: {}", deployedJar);
           DeployedJar oldJar = this.deployedJars.put(deployedJar.getJarName(), deployedJar);
-          if (oldJar != null) {
-            oldJar.cleanUp();
-          }
+          newVersionToOldVersion.put(deployedJar, oldJar);
         }
       }
 
       ClassPathLoader.getLatest().rebuildClassLoaderForDeployedJars();
 
-      for (DeployedJar deployedJar : deployedJars) {
-        if (deployedJar != null) {
-          deployedJar.loadClassesAndRegisterFunctions();
+      // Finally, unregister functions that were removed
+      for (Map.Entry<DeployedJar, DeployedJar> entry : newVersionToOldVersion.entrySet()) {
+        DeployedJar newjar = entry.getKey();
+        DeployedJar oldJar = entry.getValue();
+
+        newjar.registerFunctions();
+
+        if (oldJar != null) {
+          oldJar.cleanUp(newjar);
         }
       }
+
     } finally {
       lock.unlock();
     }
@@ -583,7 +591,7 @@ public class JarDeployer implements Serializable {
 
       ClassPathLoader.getLatest().rebuildClassLoaderForDeployedJars();
 
-      deployedJar.cleanUp();
+      deployedJar.cleanUp(null);
 
       deleteAllVersionsOfJar(jarName);
       return deployedJar.getFileCanonicalPath();

http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
index 6972477..785915a 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.management.internal.cli.commands;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.geode.cache.execute.Execution;
@@ -27,6 +28,7 @@ import org.apache.geode.test.dunit.rules.GfshShellConnectionRule;
 import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,6 +38,12 @@ import java.io.File;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 @Category(DistributedTest.class)
 public class DeployCommandRedeployDUnitTest implements Serializable {
@@ -102,6 +110,23 @@ public class DeployCommandRedeployDUnitTest implements Serializable {
     server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION2));
   }
 
+  @Test
+  public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() throws Exception {
+    gfshConnector.executeAndVerifyCommand("deploy --jar=" + jarAVersion1.getCanonicalPath());
+    server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+    server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
+
+    server.invoke(() -> LoopingFunctionExecutor.startExecuting(FUNCTION_A));
+    server.invoke(() -> LoopingFunctionExecutor.waitForExecutions(100));
+
+    gfshConnector.executeAndVerifyCommand("deploy --jar=" + jarAVersion2.getCanonicalPath());
+    server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+    server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+
+    server.invoke(() -> LoopingFunctionExecutor.waitForExecutions(100));
+    server.invoke(LoopingFunctionExecutor::stopExecutionAndThrowAnyException);
+  }
+
   // Note that jar A is a Declarable Function, while jar B is only a Function.
   // Also, the function for jar A resides in the default package, whereas jar B specifies a package.
   // This ensures that this test has identical coverage to some tests that it replaced.
@@ -151,4 +176,46 @@ public class DeployCommandRedeployDUnitTest implements Serializable {
     assertThat(ClassPathLoader.getLatest().getJarDeployer().findDeployedJar(jarName)).isNotNull();
     assertThat(ClassPathLoader.getLatest().forName(className)).isNotNull();
   }
+
+  private static class LoopingFunctionExecutor implements Serializable {
+    private static final AtomicInteger COUNT_OF_EXECUTIONS = new AtomicInteger();
+    private static final AtomicReference<Exception> EXCEPTION = new AtomicReference<>();
+    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
+
+    public static void startExecuting(String functionId) {
+      EXECUTOR_SERVICE.submit(() -> {
+        GemFireCacheImpl gemFireCache = GemFireCacheImpl.getInstance();
+        DistributedSystem distributedSystem = gemFireCache.getDistributedSystem();
+
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            COUNT_OF_EXECUTIONS.incrementAndGet();
+
+            FunctionService.onMember(distributedSystem.getDistributedMember()).execute(functionId)
+                .getResult();
+          } catch (Exception e) {
+            EXCEPTION.set(e);
+          }
+        }
+      });
+    }
+
+    public static void waitForExecutions(int numberOfExecutions) {
+      int initialCount = COUNT_OF_EXECUTIONS.get();
+      int countToWaitFor = initialCount + numberOfExecutions;
+      Callable<Boolean> doneWaiting = () -> COUNT_OF_EXECUTIONS.get() >= countToWaitFor;
+
+      Awaitility.await().atMost(3, TimeUnit.MINUTES).until(doneWaiting);
+    }
+
+    public static void stopExecutionAndThrowAnyException() throws Exception {
+      EXECUTOR_SERVICE.shutdownNow();
+      Exception e = EXCEPTION.get();
+      if (e != null) {
+        throw e;
+      }
+    }
+
+
+  }
 }