You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/11/19 02:26:43 UTC
[apex-core] branch master updated: APEXCORE-767 Set parent
classloader in StramAppLauncher loadDependencies
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new b3911c3 APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies
b3911c3 is described below
commit b3911c3515951685bc47ff77b2bf22dadb18f1b3
Author: Florian Schmidt <fl...@icloud.com>
AuthorDate: Wed Oct 11 16:34:17 2017 -0700
APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies
StramAppLauncher.loadDependencies is called multiple times when starting
an application via the apex-cli with the -local option. In each of the
calls to loadDependencies, the contextClassLoader of the current thread
would be replaced with a new instance of URLClassLoader (which has no
parent class loader set).
This can lead to issues, e.g. when one aquires the current
contextClassLauncher, loads a class with it and tries to cast it to a
class which was loaded with a previous version of the contextClassLoader,
resulting in a ClassCastException.
An example of this bug can be seen in APEXMALHAR-2511
The changes in this commit fix this by passing the parent class loader
for each new instance of URLClassLoader to the current
contextClassLoader
---
.../java/com/datatorrent/stram/cli/ApexCli.java | 9 +-
.../com/datatorrent/stram/client/AppPackage.java | 7 +-
.../datatorrent/stram/client/StramAppLauncher.java | 34 ++++-
.../apache/apex/engine/YarnAppLauncherImpl.java | 1 +
.../com/datatorrent/stram/client/AsyncTester.java | 56 +++++++++
.../stram/client/StramAppLauncherTest.java | 137 +++++++++++++++++++++
6 files changed, 237 insertions(+), 7 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 2451b10..a4ef3f5 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -2151,7 +2151,7 @@ public class ApexCli
} else {
System.err.println("No application specified.");
}
-
+ submitApp.resetContextClassLoader();
} finally {
IOUtils.closeQuietly(cp);
}
@@ -2911,8 +2911,10 @@ public class ApexCli
submitApp.loadDependencies();
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, commandLineInfo.exactMatch);
if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+ submitApp.resetContextClassLoader();
throw new CliException("No application in jar file matches '" + appName + "'");
} else if (matchingAppFactories.size() > 1) {
+ submitApp.resetContextClassLoader();
throw new CliException("More than one application in jar file match '" + appName + "'");
} else {
Map<String, Object> map = new HashMap<>();
@@ -2940,6 +2942,7 @@ public class ApexCli
}
}
printJson(map);
+ submitApp.resetContextClassLoader();
}
} else {
if (filename.endsWith(".json")) {
@@ -2971,6 +2974,7 @@ public class ApexCli
appList.add(m);
}
printJson(appList, "applications");
+ submitApp.resetContextClassLoader();
}
}
} else {
@@ -3200,8 +3204,10 @@ public class ApexCli
submitApp.loadDependencies();
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, true);
if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+ submitApp.resetContextClassLoader();
throw new CliException("No application in jar file matches '" + appName + "'");
} else if (matchingAppFactories.size() > 1) {
+ submitApp.resetContextClassLoader();
throw new CliException("More than one application in jar file match '" + appName + "'");
} else {
AppFactory appFactory = matchingAppFactories.get(0);
@@ -3211,6 +3217,7 @@ public class ApexCli
file.createNewFile();
}
LogicalPlanSerializer.convertToProperties(logicalPlan).save(file);
+ submitApp.resetContextClassLoader();
}
} else {
if (currentApp == null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index a606b06..5e03438 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -465,8 +465,9 @@ public class AppPackage implements Closeable
if (entry.getName().endsWith(".jar") && !skipJars) {
appJars.add(entry.getName());
+ StramAppLauncher stramAppLauncher = null;
try {
- StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config);
+ stramAppLauncher = new StramAppLauncher(entry, config);
stramAppLauncher.loadDependencies();
List<AppFactory> appFactories = stramAppLauncher.getBundledTopologies();
for (AppFactory appFactory : appFactories) {
@@ -486,6 +487,10 @@ public class AppPackage implements Closeable
}
} catch (Exception ex) {
LOG.error("Caught exception trying to process {}", entry.getName(), ex);
+ } finally {
+ if (stramAppLauncher != null) {
+ stramAppLauncher.resetContextClassLoader();
+ }
}
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index d4f0170..2019f48 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -106,6 +106,9 @@ public class StramAppLauncher
private LinkedHashSet<File> deployJars;
private final StringWriter mvnBuildClasspathOutput = new StringWriter();
+ private ClassLoader initialClassLoader;
+ private Thread loaderThread;
+
public interface AppFactory
{
LogicalPlan createApp(LogicalPlanConfiguration conf);
@@ -220,7 +223,6 @@ public class StramAppLauncher
}
}
-
public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
{
this.jarFile = appJarFile;
@@ -535,10 +537,32 @@ public class StramAppLauncher
public URLClassLoader loadDependencies()
{
- URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
- Thread.currentThread().setContextClassLoader(cl);
- StringCodecs.check();
- return cl;
+ if (this.loaderThread == null && this.initialClassLoader == null) {
+ this.loaderThread = Thread.currentThread();
+ this.initialClassLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ if (Thread.currentThread() != this.loaderThread) {
+ throw new RuntimeException("Calls to loadDependencies can only be made on the same thread that loadDependencies was called on for the first time");
+ } else {
+ URL[] dependencies = launchDependencies.toArray(new URL[launchDependencies.size()]);
+
+ ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+ URLClassLoader cl = URLClassLoader.newInstance(dependencies, currentContextClassLoader);
+ Thread.currentThread().setContextClassLoader(cl);
+
+ StringCodecs.check();
+ return cl;
+ }
+ }
+
+ public void resetContextClassLoader()
+ {
+ if (Thread.currentThread() != this.loaderThread) {
+ throw new RuntimeException("Calls to resetContextClassLoader can only be made on the same thread that loadDependencies was called on for the first time");
+ }
+
+ Thread.currentThread().setContextClassLoader(initialClassLoader);
}
private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 9a69b08..7b43ab5 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -80,6 +80,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
}
};
ApplicationId appId = appLauncher.launchApp(appFactory);
+ appLauncher.resetContextClassLoader();
return new YarnAppHandleImpl(appId, conf);
} catch (Exception ex) {
throw new LauncherException(ex);
diff --git a/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
new file mode 100644
index 0000000..e8d692e
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.client;
+
+// See https://stackoverflow.com/a/2596530
+public class AsyncTester
+{
+ private Thread thread;
+ private volatile AssertionError error;
+
+ public AsyncTester(final Runnable runnable)
+ {
+ thread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ runnable.run();
+ } catch (AssertionError e) {
+ error = e;
+ }
+ }
+ });
+ }
+
+ public AsyncTester start()
+ {
+ thread.start();
+ return this;
+ }
+
+ public void test() throws AssertionError, InterruptedException
+ {
+ thread.join();
+ if (error != null) {
+ throw error;
+ }
+ }
+}
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index 2069bab..e31a6dd 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -20,6 +20,8 @@ package com.datatorrent.stram.client;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
+import java.util.LinkedHashSet;
import org.junit.Assert;
import org.junit.Rule;
@@ -55,6 +57,141 @@ public class StramAppLauncherTest
@PrepareForTest({StramAppLauncher.class})
@PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
+ public static class LoadDependenciesTest
+ {
+
+ @Rule
+ public PowerMockRule rule = new PowerMockRule();
+
+ @Rule
+ public TestWatcher setup = new TestWatcher()
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ suppress(method(StramAppLauncher.class, "init"));
+ }
+ };
+
+ @Test
+ public void testLoadDependenciesSetsParentClassLoader() throws Exception
+ {
+ // Setup
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.newInstance(conf);
+ StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+ Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+ // Get initial contextClassLoader
+ ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+ appLauncher.loadDependencies();
+
+ // Make sure that new contextClassLoader has initialClassLoader as parent
+ ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+
+ Assert.assertSame(initialClassLoader, currentClassLoader.getParent());
+ }
+
+ @Test
+ public void testResetContextClassLoaderResetsToInitialClassLoader() throws Exception
+ {
+ // Setup
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.newInstance(conf);
+ StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+ Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+ // Get initial contextClassLoader
+ ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+ appLauncher.loadDependencies();
+ appLauncher.resetContextClassLoader();
+
+ ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+ Assert.assertSame(initialClassLoader, currentClassLoader);
+ }
+
+ @Test
+ public void testResetContextClassloaderOnlyOnInitialThread() throws Exception
+ {
+ // Setup
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.newInstance(conf);
+ final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+ Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+ new AsyncTester(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ appLauncher.loadDependencies();
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }).start().test();
+
+ new AsyncTester(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ appLauncher.resetContextClassLoader();
+ Assert.fail("An exception should be thrown");
+ } catch (RuntimeException e) {
+ // catch as expected
+ }
+ }
+ }).start().test();
+ }
+
+ @Test
+ public void testLoadDependenciesOnlyOnInitialThread() throws Exception
+ {
+ // Setup
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.newInstance(conf);
+ final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+ Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+ new AsyncTester(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ appLauncher.loadDependencies();
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }).start().test();
+
+ new AsyncTester(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ appLauncher.loadDependencies();
+ Assert.fail("An exception should be thrown");
+ } catch (RuntimeException e) {
+ // catch as expected
+ }
+ }
+ }).start().test();
+ }
+ }
+
+ @PrepareForTest({StramAppLauncher.class})
+ @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
public static class RefreshTokenTests
{
File workspace;
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].