You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/30 04:24:22 UTC

[1/2] apex-core git commit: APEXCORE-683 Apex client should support application packages on HDFS

Repository: apex-core
Updated Branches:
  refs/heads/master 9d6408ea4 -> 9054fd2b9


APEXCORE-683 Apex client should support application packages on HDFS


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/926ecc89
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/926ecc89
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/926ecc89

Branch: refs/heads/master
Commit: 926ecc89aa9083d1b8104f90d1e3bc31dd368bbd
Parents: 04a352b
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sat Mar 25 09:00:33 2017 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Mar 28 10:52:21 2017 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/cli/ApexCli.java | 51 ++++++----
 .../datatorrent/stram/client/AppPackage.java    | 98 ++++++++++++++++----
 .../stram/client/AppPackageTest.java            | 17 +---
 3 files changed, 117 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index dfaae97..77959ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -20,6 +20,7 @@ package com.datatorrent.stram.cli;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
@@ -139,7 +140,6 @@ import jline.console.completer.StringsCompleter;
 import jline.console.history.FileHistory;
 import jline.console.history.History;
 import jline.console.history.MemoryHistory;
-import net.lingala.zip4j.exception.ZipException;
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -460,7 +460,26 @@ public class ApexCli
     }
   }
 
-  AppPackage newAppPackageInstance(File f) throws IOException, ZipException
+  AppPackage newAppPackageInstance(URI uri, boolean suppressOutput) throws IOException
+  {
+    PrintStream outputStream = suppressOutput ? suppressOutput() : null;
+    try {
+      final String scheme = uri.getScheme();
+      if (scheme == null || scheme.equals("file")) {
+        return new AppPackage(new FileInputStream(new File(expandFileName(uri.getPath(), true))), true);
+      } else {
+        try (FileSystem fs = FileSystem.newInstance(uri, conf)) {
+          return new AppPackage(fs.open(new Path(uri.getPath())), true);
+        }
+      }
+    } finally {
+      if (outputStream != null) {
+        restoreOutput(outputStream);
+      }
+    }
+  }
+
+  AppPackage newAppPackageInstance(File f) throws IOException
   {
     PrintStream outputStream = suppressOutput();
     try {
@@ -609,7 +628,7 @@ public class ApexCli
         "Connect to an app"));
     globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(),
         new Arg[]{},
-        new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file"), new Arg("matching-app-name")},
+        new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file-path/app-package-file-uri"), new Arg("matching-app-name")},
         "Launch an app", LAUNCH_OPTIONS.options));
     globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
         new Arg[]{new Arg("app-id")},
@@ -673,17 +692,17 @@ public class ApexCli
         new Arg[]{new FileArg("parameter-name")},
         "Get the configuration parameter"));
     globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(),
-        new Arg[]{new FileArg("app-package-file")},
+        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
         new Arg[]{new Arg("-withDescription")},
         "Get info on the app package file",
         GET_APP_PACKAGE_INFO_OPTIONS));
     globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(),
-        new Arg[]{new FileArg("app-package-file")},
+        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
         new Arg[]{new Arg("search-term")},
         "Get operators within the given app package",
         GET_OPERATOR_CLASSES_OPTIONS.options));
     globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(),
-        new Arg[]{new FileArg("app-package-file"), new Arg("operator-class")},
+        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri"), new Arg("operator-class")},
         null,
         "Get operator properties within the given app package"));
     globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION),
@@ -771,7 +790,7 @@ public class ApexCli
         "Begin Logical Plan Change"));
     connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
         null,
-        new Arg[]{new FileArg("jar-file/app-package-file"), new Arg("class-name")},
+        new Arg[]{new FileArg("jar-file/app-package-file-path/app-package-file-uri"), new Arg("class-name")},
         "Show logical plan of an app class",
         getShowLogicalPlanCommandLineOptions()));
     connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
@@ -1944,7 +1963,7 @@ public class ApexCli
             // see if it's an app package
             AppPackage ap = null;
             try {
-              ap = newAppPackageInstance(new File(fileName));
+              ap = newAppPackageInstance(new URI(fileName), true);
             } catch (Exception ex) {
               // It's not an app package
               if (requiredAppPackageName != null) {
@@ -2842,18 +2861,15 @@ public class ApexCli
       }
 
       if (commandLineInfo.args.length > 0) {
-        String filename = expandFileName(commandLineInfo.args[0], true);
-
         // see if the first argument is actually an app package
-        try {
-          AppPackage ap = new AppPackage(new File(filename));
-          ap.close();
+        try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), false)) {
           new ShowLogicalPlanAppPackageCommand().execute(args, reader);
           return;
         } catch (Exception ex) {
           // fall through
         }
 
+        String filename = expandFileName(commandLineInfo.args[0], true);
         if (commandLineInfo.args.length >= 2) {
           String appName = commandLineInfo.args[1];
           StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
@@ -2938,8 +2954,7 @@ public class ApexCli
     @Override
     public void execute(String[] args, ConsoleReader reader) throws Exception
     {
-      String jarfile = expandFileName(args[1], true);
-      try (AppPackage ap = newAppPackageInstance(new File(jarfile))) {
+      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
         List<AppInfo> applications = ap.getApplications();
 
         if (args.length >= 3) {
@@ -3499,7 +3514,7 @@ public class ApexCli
       String[] tmpArgs = new String[args.length - 2];
       System.arraycopy(args, 2, tmpArgs, 0, args.length - 2);
       GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs);
-      try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) {
+      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
         JSONSerializationProvider jomp = new JSONSerializationProvider();
         jomp.addSerializer(PropertyInfo.class,
             new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription));
@@ -3877,7 +3892,7 @@ public class ApexCli
       String[] tmpArgs = new String[args.length - 1];
       System.arraycopy(args, 1, tmpArgs, 0, args.length - 1);
       GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(tmpArgs);
-      try (AppPackage ap = newAppPackageInstance(new File(expandFileName(commandLineInfo.args[0], true)))) {
+      try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), true)) {
         List<String> newArgs = new ArrayList<>();
         List<String> jars = new ArrayList<>();
         for (String jar : ap.getAppJars()) {
@@ -3907,7 +3922,7 @@ public class ApexCli
     @Override
     public void execute(String[] args, ConsoleReader reader) throws Exception
     {
-      try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) {
+      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
         List<String> newArgs = new ArrayList<>();
         List<String> jars = new ArrayList<>();
         for (String jar : ap.getAppJars()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index 238b646..a4d0364 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -18,8 +18,12 @@
  */
 package com.datatorrent.stram.client;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,7 +35,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
+import java.util.jar.JarInputStream;
 import java.util.jar.Manifest;
 
 import org.codehaus.jackson.JsonGenerator;
@@ -42,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -60,7 +67,7 @@ import net.lingala.zip4j.model.ZipParameters;
  *
  * @since 1.0.3
  */
-public class AppPackage extends JarFile
+public class AppPackage implements Closeable
 {
   public static final String ATTRIBUTE_DT_ENGINE_VERSION = "DT-Engine-Version";
   public static final String ATTRIBUTE_DT_APP_PACKAGE_NAME = "DT-App-Package-Name";
@@ -160,9 +167,14 @@ public class AppPackage extends JarFile
     }
   }
 
-  public AppPackage(File file) throws IOException, ZipException
+  public AppPackage(File file) throws IOException
   {
-    this(file, false);
+    this(new FileInputStream(file));
+  }
+
+  public AppPackage(InputStream input) throws IOException
+  {
+    this(input, false);
   }
 
   /**
@@ -178,11 +190,29 @@ public class AppPackage extends JarFile
    * @param contentFolder  the folder that the app package will be extracted to
    * @param processAppDirectory
    * @throws java.io.IOException
-   * @throws net.lingala.zip4j.exception.ZipException
    */
-  public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException, ZipException
+  public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException
+  {
+    this(new FileInputStream(file), contentFolder, processAppDirectory);
+  }
+
+  /**
+   * Creates an App Package object.
+   *
+   * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived
+   * applications
+   *
+   * If contentFolder is not null, it will try to create the contentFolder, file will be retained on disk after App Package is closed
+   * If contentFolder is null, temp folder will be created and will be cleaned on close()
+   *
+   * @param input
+   * @param contentFolder  the folder that the app package will be extracted to
+   * @param processAppDirectory
+   * @throws java.io.IOException
+   */
+  public AppPackage(InputStream input, File contentFolder, boolean processAppDirectory) throws IOException
   {
-    super(file);
+    final JarInputStream jarInputStream = new JarInputStream(input);
 
     if (contentFolder != null) {
       FileUtils.forceMkdir(contentFolder);
@@ -193,7 +223,7 @@ public class AppPackage extends JarFile
     }
     directory = contentFolder;
 
-    Manifest manifest = getManifest();
+    Manifest manifest = jarInputStream.getManifest();
     if (manifest == null) {
       throw new IOException("Not a valid app package. MANIFEST.MF is not present.");
     }
@@ -209,7 +239,7 @@ public class AppPackage extends JarFile
       throw new IOException("Not a valid app package.  App Package Name or Version or Class-Path is missing from MANIFEST.MF");
     }
     classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " ")));
-    extractToDirectory(directory, file);
+    extractToDirectory(directory, jarInputStream);
 
     File confDirectory = new File(directory, "conf");
     if (confDirectory.exists()) {
@@ -251,23 +281,56 @@ public class AppPackage extends JarFile
    * @param file
    * @param processAppDirectory
    * @throws java.io.IOException
-   * @throws net.lingala.zip4j.exception.ZipException
    */
-  public AppPackage(File file, boolean processAppDirectory) throws IOException, ZipException
+  public AppPackage(File file, boolean processAppDirectory) throws IOException
+  {
+    this(new FileInputStream(file), processAppDirectory);
+  }
+
+  /**
+   * Creates an App Package object.
+   *
+   * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived
+   * applications
+   *
+   * Files in app package will be extracted to tmp folder and will be cleaned on close()
+   * The close() method could be explicitly called or implicitly called by GC finalize()
+   *
+   * @param input
+   * @param processAppDirectory
+   * @throws java.io.IOException
+   */
+  public AppPackage(InputStream input, boolean processAppDirectory) throws IOException
   {
-    this(file, null, processAppDirectory);
+    this(input, null, processAppDirectory);
   }
 
-  public static void extractToDirectory(File directory, File appPackageFile) throws ZipException
+  public static void extractToDirectory(File directory, File appPackageFile) throws IOException
   {
-    ZipFile zipFile = new ZipFile(appPackageFile);
+    extractToDirectory(directory, new JarInputStream(new FileInputStream(appPackageFile)));
+  }
 
-    if (zipFile.isEncrypted()) {
-      throw new ZipException("Encrypted app package not supported yet");
+  private static void extractToDirectory(File directory, JarInputStream input) throws IOException
+  {
+    File manifestFile = new File(directory, JarFile.MANIFEST_NAME);
+    manifestFile.getParentFile().mkdirs();
+    try (FileOutputStream output = new FileOutputStream(manifestFile)) {
+      input.getManifest().write(output);
     }
 
-    directory.mkdirs();
-    zipFile.extractAll(directory.getAbsolutePath());
+    JarEntry entry = input.getNextJarEntry();
+    while (entry != null) {
+      File newFile = new File(directory, entry.getName());
+      if (entry.isDirectory()) {
+        newFile.mkdirs();
+      } else {
+        try (FileOutputStream output = new FileOutputStream(newFile)) {
+          IOUtils.copy(input, output);
+        }
+      }
+      input.closeEntry();
+      entry = input.getNextJarEntry();
+    }
   }
 
   public static void createAppPackageFile(File fileToBeCreated, File directory) throws ZipException
@@ -286,7 +349,6 @@ public class AppPackage extends JarFile
   @Override
   public void close() throws IOException
   {
-    super.close();
     if (cleanOnClose) {
       cleanContent();
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
index 20550a9..f7bfa12 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
@@ -41,8 +41,6 @@ import com.datatorrent.stram.client.AppPackage.PropertyInfo;
 import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.util.JSONSerializationProvider;
 
-import net.lingala.zip4j.exception.ZipException;
-
 /**
  *
  */
@@ -57,24 +55,17 @@ public class AppPackageTest
   String appPackageDir = "src/test/resources/testAppPackage/mydtapp";
 
   @BeforeClass
-  public static void starting()
+  public static void starting() throws IOException, JSONException
   {
+    File file = StramTestSupport.createAppPackageFile();
+    // Set up test instance
+    ap = new AppPackage(file, true);
     try {
-      File file = StramTestSupport.createAppPackageFile();
-      // Set up test instance
-      ap = new AppPackage(file, true);
       // set up another instance
       File testfolder = new File("target/testapp");
       yap = new AppPackage(file, testfolder, false);
       jomp = new JSONSerializationProvider();
       json = new JSONObject(jomp.getContext(null).writeValueAsString(ap));
-
-    } catch (ZipException e) {
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (JSONException e) {
-      throw new RuntimeException(e);
     } finally {
       IOUtils.closeQuietly(ap);
       IOUtils.closeQuietly(yap);


[2/2] apex-core git commit: Merge branch 'APEXCORE-683' of github.com:vrozov/apex-core

Posted by pr...@apache.org.
Merge branch 'APEXCORE-683' of github.com:vrozov/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9054fd2b
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9054fd2b
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9054fd2b

Branch: refs/heads/master
Commit: 9054fd2b9775116aaa790b3946600b84acf41b06
Parents: 9d6408e 926ecc8
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Mar 29 20:45:11 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Mar 29 20:45:11 2017 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/cli/ApexCli.java | 51 ++++++----
 .../datatorrent/stram/client/AppPackage.java    | 98 ++++++++++++++++----
 .../stram/client/AppPackageTest.java            | 17 +---
 3 files changed, 117 insertions(+), 49 deletions(-)
----------------------------------------------------------------------