You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by js...@apache.org on 2017/05/15 21:57:00 UTC
geode git commit: GEODE-2912: Hot deploy for functions in deployed
Jars
Repository: geode
Updated Branches:
refs/heads/develop 614031725 -> f18e1d294
GEODE-2912: Hot deploy for functions in deployed Jars
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f18e1d29
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f18e1d29
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f18e1d29
Branch: refs/heads/develop
Commit: f18e1d29473e357ddbc8891acb126ce3c1970606
Parents: 6140317
Author: Jared Stewart <js...@pivotal.io>
Authored: Thu May 11 14:16:59 2017 -0700
Committer: Jared Stewart <js...@pivotal.io>
Committed: Mon May 15 14:56:02 2017 -0700
----------------------------------------------------------------------
.../org/apache/geode/internal/DeployedJar.java | 72 +++++++++++++-------
.../org/apache/geode/internal/JarDeployer.java | 22 ++++--
.../DeployCommandRedeployDUnitTest.java | 67 ++++++++++++++++++
3 files changed, 129 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
index 820eb85..037ef9e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DeployedJar.java
@@ -14,6 +14,19 @@
*/
package org.apache.geode.internal;
+import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
+import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.pdx.internal.TypeRegistry;
+import org.apache.logging.log4j.Logger;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -34,22 +47,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import java.util.function.Predicate;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.regex.Pattern;
-
-import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
-import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Declarable;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.pdx.internal.TypeRegistry;
+import java.util.stream.Stream;
/**
* ClassLoader for a single JAR file.
@@ -156,15 +158,10 @@ public class DeployedJar {
}
/**
- * Scan the JAR file and attempt to load all classes and register any function classes found.
+ * Scan the JAR file and attempt to register any function classes found.
*/
- // This method will process the contents of the JAR file as stored in this.jarByteContent
- // instead of reading from the original JAR file. This is done because we can't open up
- // the original file and then close it without releasing the shared lock that was obtained
- // in the constructor. Once this method is finished, all classes will have been loaded and
- // there will no longer be a need to hang on to the original contents so they will be
- // discarded.
- synchronized void loadClassesAndRegisterFunctions() throws ClassNotFoundException {
+
+ public synchronized void registerFunctions() throws ClassNotFoundException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Registering functions with DeployedJar: {}", this);
@@ -227,12 +224,28 @@ public class DeployedJar {
}
}
- synchronized void cleanUp() {
- for (Function function : this.registeredFunctions) {
- FunctionService.unregisterFunction(function.getId());
+ /**
+ * Unregisters all functions from this jar if it was undeployed (i.e. newVersion == null), or all
+ * functions not present in the new version if it was redeployed.
+ *
+ * @param newVersion The new version of this jar that was deployed, or null if this jar was
+ * undeployed.
+ */
+ protected synchronized void cleanUp(DeployedJar newVersion) {
+ Stream<String> oldFunctions = this.registeredFunctions.stream().map(Function::getId);
+
+ Stream<String> removedFunctions;
+ if (newVersion == null) {
+ removedFunctions = oldFunctions;
+ } else {
+ Predicate<String> isRemoved =
+ (String oldFunctionId) -> !newVersion.hasFunctionWithId(oldFunctionId);
+
+ removedFunctions = oldFunctions.filter(isRemoved);
}
- this.registeredFunctions.clear();
+ removedFunctions.forEach(FunctionService::unregisterFunction);
+ this.registeredFunctions.clear();
try {
TypeRegistry typeRegistry = ((InternalCache) CacheFactory.getAnyInstance()).getPdxRegistry();
if (typeRegistry != null) {
@@ -396,6 +409,15 @@ public class DeployedJar {
return null;
}
+ private boolean hasFunctionWithId(String id) {
+ if (CollectionUtils.isEmpty(this.registeredFunctions)) {
+ return false;
+ }
+
+ return this.registeredFunctions.stream().map(Function::getId)
+ .anyMatch(functionId -> functionId.equals(id));
+ }
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java b/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
index df3f10b..c21fb9d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/JarDeployer.java
@@ -33,6 +33,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -459,23 +460,30 @@ public class JarDeployer implements Serializable {
throws ClassNotFoundException {
lock.lock();
try {
+ Map<DeployedJar, DeployedJar> newVersionToOldVersion = new HashMap<>();
+
for (DeployedJar deployedJar : deployedJars) {
if (deployedJar != null) {
logger.info("Registering new version of jar: {}", deployedJar);
DeployedJar oldJar = this.deployedJars.put(deployedJar.getJarName(), deployedJar);
- if (oldJar != null) {
- oldJar.cleanUp();
- }
+ newVersionToOldVersion.put(deployedJar, oldJar);
}
}
ClassPathLoader.getLatest().rebuildClassLoaderForDeployedJars();
- for (DeployedJar deployedJar : deployedJars) {
- if (deployedJar != null) {
- deployedJar.loadClassesAndRegisterFunctions();
+ // Finally, unregister functions that were removed
+ for (Map.Entry<DeployedJar, DeployedJar> entry : newVersionToOldVersion.entrySet()) {
+ DeployedJar newjar = entry.getKey();
+ DeployedJar oldJar = entry.getValue();
+
+ newjar.registerFunctions();
+
+ if (oldJar != null) {
+ oldJar.cleanUp(newjar);
}
}
+
} finally {
lock.unlock();
}
@@ -583,7 +591,7 @@ public class JarDeployer implements Serializable {
ClassPathLoader.getLatest().rebuildClassLoaderForDeployedJars();
- deployedJar.cleanUp();
+ deployedJar.cleanUp(null);
deleteAllVersionsOfJar(jarName);
return deployedJar.getFileCanonicalPath();
http://git-wip-us.apache.org/repos/asf/geode/blob/f18e1d29/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
index 6972477..785915a 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.management.internal.cli.commands;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
import org.apache.commons.io.FileUtils;
import org.apache.geode.cache.execute.Execution;
@@ -27,6 +28,7 @@ import org.apache.geode.test.dunit.rules.GfshShellConnectionRule;
import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -36,6 +38,12 @@ import java.io.File;
import java.io.Serializable;
import java.net.URL;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
@Category(DistributedTest.class)
public class DeployCommandRedeployDUnitTest implements Serializable {
@@ -102,6 +110,23 @@ public class DeployCommandRedeployDUnitTest implements Serializable {
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION2));
}
+ @Test
+ public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() throws Exception {
+ gfshConnector.executeAndVerifyCommand("deploy --jar=" + jarAVersion1.getCanonicalPath());
+ server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+ server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
+
+ server.invoke(() -> LoopingFunctionExecutor.startExecuting(FUNCTION_A));
+ server.invoke(() -> LoopingFunctionExecutor.waitForExecutions(100));
+
+ gfshConnector.executeAndVerifyCommand("deploy --jar=" + jarAVersion2.getCanonicalPath());
+ server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+ server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+
+ server.invoke(() -> LoopingFunctionExecutor.waitForExecutions(100));
+ server.invoke(LoopingFunctionExecutor::stopExecutionAndThrowAnyException);
+ }
+
// Note that jar A is a Declarable Function, while jar B is only a Function.
// Also, the function for jar A resides in the default package, whereas jar B specifies a package.
// This ensures that this test has identical coverage to some tests that it replaced.
@@ -151,4 +176,46 @@ public class DeployCommandRedeployDUnitTest implements Serializable {
assertThat(ClassPathLoader.getLatest().getJarDeployer().findDeployedJar(jarName)).isNotNull();
assertThat(ClassPathLoader.getLatest().forName(className)).isNotNull();
}
+
+ private static class LoopingFunctionExecutor implements Serializable {
+ private static final AtomicInteger COUNT_OF_EXECUTIONS = new AtomicInteger();
+ private static final AtomicReference<Exception> EXCEPTION = new AtomicReference<>();
+ private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
+
+ public static void startExecuting(String functionId) {
+ EXECUTOR_SERVICE.submit(() -> {
+ GemFireCacheImpl gemFireCache = GemFireCacheImpl.getInstance();
+ DistributedSystem distributedSystem = gemFireCache.getDistributedSystem();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ COUNT_OF_EXECUTIONS.incrementAndGet();
+
+ FunctionService.onMember(distributedSystem.getDistributedMember()).execute(functionId)
+ .getResult();
+ } catch (Exception e) {
+ EXCEPTION.set(e);
+ }
+ }
+ });
+ }
+
+ public static void waitForExecutions(int numberOfExecutions) {
+ int initialCount = COUNT_OF_EXECUTIONS.get();
+ int countToWaitFor = initialCount + numberOfExecutions;
+ Callable<Boolean> doneWaiting = () -> COUNT_OF_EXECUTIONS.get() >= countToWaitFor;
+
+ Awaitility.await().atMost(3, TimeUnit.MINUTES).until(doneWaiting);
+ }
+
+ public static void stopExecutionAndThrowAnyException() throws Exception {
+ EXECUTOR_SERVICE.shutdownNow();
+ Exception e = EXCEPTION.get();
+ if (e != null) {
+ throw e;
+ }
+ }
+
+
+ }
}