You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/11/19 02:26:43 UTC

[apex-core] branch master updated: APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new b3911c3  APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies
b3911c3 is described below

commit b3911c3515951685bc47ff77b2bf22dadb18f1b3
Author: Florian Schmidt <fl...@icloud.com>
AuthorDate: Wed Oct 11 16:34:17 2017 -0700

    APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies
    
    StramAppLauncher.loadDependencies is called multiple times when starting
    an application via the apex-cli with the -local option. In each of the
    calls to loadDependencies, the contextClassLoader of the current thread
    would be replaced with a new instance of URLClassLoader (which has no
    parent class loader set).
    
    This can lead to issues, e.g. when one aquires the current
    contextClassLauncher, loads a class with it and tries to cast it to a
    class which was loaded with a previous version of the contextClassLoader,
    resulting in a ClassCastException.
    
    An example of this bug can be seen in APEXMALHAR-2511
    
    The changes in this commit fix this by passing the parent class loader
    for each new instance of URLClassLoader to the current
    contextClassLoader
---
 .../java/com/datatorrent/stram/cli/ApexCli.java    |   9 +-
 .../com/datatorrent/stram/client/AppPackage.java   |   7 +-
 .../datatorrent/stram/client/StramAppLauncher.java |  34 ++++-
 .../apache/apex/engine/YarnAppLauncherImpl.java    |   1 +
 .../com/datatorrent/stram/client/AsyncTester.java  |  56 +++++++++
 .../stram/client/StramAppLauncherTest.java         | 137 +++++++++++++++++++++
 6 files changed, 237 insertions(+), 7 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 2451b10..a4ef3f5 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -2151,7 +2151,7 @@ public class ApexCli
         } else {
           System.err.println("No application specified.");
         }
-
+        submitApp.resetContextClassLoader();
       } finally {
         IOUtils.closeQuietly(cp);
       }
@@ -2911,8 +2911,10 @@ public class ApexCli
           submitApp.loadDependencies();
           List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, commandLineInfo.exactMatch);
           if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+            submitApp.resetContextClassLoader();
             throw new CliException("No application in jar file matches '" + appName + "'");
           } else if (matchingAppFactories.size() > 1) {
+            submitApp.resetContextClassLoader();
             throw new CliException("More than one application in jar file match '" + appName + "'");
           } else {
             Map<String, Object> map = new HashMap<>();
@@ -2940,6 +2942,7 @@ public class ApexCli
               }
             }
             printJson(map);
+            submitApp.resetContextClassLoader();
           }
         } else {
           if (filename.endsWith(".json")) {
@@ -2971,6 +2974,7 @@ public class ApexCli
               appList.add(m);
             }
             printJson(appList, "applications");
+            submitApp.resetContextClassLoader();
           }
         }
       } else {
@@ -3200,8 +3204,10 @@ public class ApexCli
         submitApp.loadDependencies();
         List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, true);
         if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+          submitApp.resetContextClassLoader();
           throw new CliException("No application in jar file matches '" + appName + "'");
         } else if (matchingAppFactories.size() > 1) {
+          submitApp.resetContextClassLoader();
           throw new CliException("More than one application in jar file match '" + appName + "'");
         } else {
           AppFactory appFactory = matchingAppFactories.get(0);
@@ -3211,6 +3217,7 @@ public class ApexCli
             file.createNewFile();
           }
           LogicalPlanSerializer.convertToProperties(logicalPlan).save(file);
+          submitApp.resetContextClassLoader();
         }
       } else {
         if (currentApp == null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index a606b06..5e03438 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -465,8 +465,9 @@ public class AppPackage implements Closeable
 
       if (entry.getName().endsWith(".jar") && !skipJars) {
         appJars.add(entry.getName());
+        StramAppLauncher stramAppLauncher = null;
         try {
-          StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config);
+          stramAppLauncher = new StramAppLauncher(entry, config);
           stramAppLauncher.loadDependencies();
           List<AppFactory> appFactories = stramAppLauncher.getBundledTopologies();
           for (AppFactory appFactory : appFactories) {
@@ -486,6 +487,10 @@ public class AppPackage implements Closeable
           }
         } catch (Exception ex) {
           LOG.error("Caught exception trying to process {}", entry.getName(), ex);
+        } finally {
+          if (stramAppLauncher != null) {
+            stramAppLauncher.resetContextClassLoader();
+          }
         }
       }
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index d4f0170..2019f48 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -106,6 +106,9 @@ public class StramAppLauncher
   private LinkedHashSet<File> deployJars;
   private final StringWriter mvnBuildClasspathOutput = new StringWriter();
 
+  private ClassLoader initialClassLoader;
+  private Thread loaderThread;
+
   public interface AppFactory
   {
     LogicalPlan createApp(LogicalPlanConfiguration conf);
@@ -220,7 +223,6 @@ public class StramAppLauncher
     }
   }
 
-
   public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
   {
     this.jarFile = appJarFile;
@@ -535,10 +537,32 @@ public class StramAppLauncher
 
   public URLClassLoader loadDependencies()
   {
-    URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
-    Thread.currentThread().setContextClassLoader(cl);
-    StringCodecs.check();
-    return cl;
+    if (this.loaderThread == null && this.initialClassLoader == null) {
+      this.loaderThread = Thread.currentThread();
+      this.initialClassLoader = Thread.currentThread().getContextClassLoader();
+    }
+
+    if (Thread.currentThread() != this.loaderThread) {
+      throw new RuntimeException("Calls to loadDependencies can only be made on the same thread that loadDependencies was called on for the first time");
+    } else {
+      URL[] dependencies = launchDependencies.toArray(new URL[launchDependencies.size()]);
+
+      ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+      URLClassLoader cl = URLClassLoader.newInstance(dependencies, currentContextClassLoader);
+      Thread.currentThread().setContextClassLoader(cl);
+
+      StringCodecs.check();
+      return cl;
+    }
+  }
+
+  public void resetContextClassLoader()
+  {
+    if (Thread.currentThread() != this.loaderThread) {
+      throw new RuntimeException("Calls to resetContextClassLoader can only be made on the same thread that loadDependencies was called on for the first time");
+    }
+
+    Thread.currentThread().setContextClassLoader(initialClassLoader);
   }
 
   private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 9a69b08..7b43ab5 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -80,6 +80,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
         }
       };
       ApplicationId appId = appLauncher.launchApp(appFactory);
+      appLauncher.resetContextClassLoader();
       return new YarnAppHandleImpl(appId, conf);
     } catch (Exception ex) {
       throw new LauncherException(ex);
diff --git a/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
new file mode 100644
index 0000000..e8d692e
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.client;
+
+// See https://stackoverflow.com/a/2596530
+public class AsyncTester
+{
+  private Thread thread;
+  private volatile AssertionError error;
+
+  public AsyncTester(final Runnable runnable)
+  {
+    thread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          runnable.run();
+        } catch (AssertionError e) {
+          error = e;
+        }
+      }
+    });
+  }
+
+  public AsyncTester start()
+  {
+    thread.start();
+    return this;
+  }
+
+  public void test() throws AssertionError, InterruptedException
+  {
+    thread.join();
+    if (error != null) {
+      throw error;
+    }
+  }
+}
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index 2069bab..e31a6dd 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -20,6 +20,8 @@ package com.datatorrent.stram.client;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
+import java.util.LinkedHashSet;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -55,6 +57,141 @@ public class StramAppLauncherTest
 
   @PrepareForTest({StramAppLauncher.class})
   @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
+  public static class LoadDependenciesTest
+  {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    @Rule
+    public TestWatcher setup = new TestWatcher()
+    {
+      @Override
+      protected void starting(Description description)
+      {
+        super.starting(description);
+        suppress(method(StramAppLauncher.class, "init"));
+      }
+    };
+
+    @Test
+    public void testLoadDependenciesSetsParentClassLoader() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      // Get initial contextClassLoader
+      ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+      appLauncher.loadDependencies();
+
+      // Make sure that new contextClassLoader has initialClassLoader as parent
+      ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+
+      Assert.assertSame(initialClassLoader, currentClassLoader.getParent());
+    }
+
+    @Test
+    public void testResetContextClassLoaderResetsToInitialClassLoader() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      // Get initial contextClassLoader
+      ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+      appLauncher.loadDependencies();
+      appLauncher.resetContextClassLoader();
+
+      ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+      Assert.assertSame(initialClassLoader, currentClassLoader);
+    }
+
+    @Test
+    public void testResetContextClassloaderOnlyOnInitialThread() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+          } catch (Exception e) {
+            Assert.fail(e.getMessage());
+          }
+        }
+      }).start().test();
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.resetContextClassLoader();
+            Assert.fail("An exception should be thrown");
+          } catch (RuntimeException e) {
+            // catch as expected
+          }
+        }
+      }).start().test();
+    }
+
+    @Test
+    public void testLoadDependenciesOnlyOnInitialThread() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+          } catch (Exception e) {
+            Assert.fail(e.getMessage());
+          }
+        }
+      }).start().test();
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+            Assert.fail("An exception should be thrown");
+          } catch (RuntimeException e) {
+            // catch as expected
+          }
+        }
+      }).start().test();
+    }
+  }
+
+  @PrepareForTest({StramAppLauncher.class})
+  @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
   public static class RefreshTokenTests
   {
     File workspace;

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].