You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/30 04:24:22 UTC
[1/2] apex-core git commit: APEXCORE-683 Apex client should support
application packages on HDFS
Repository: apex-core
Updated Branches:
refs/heads/master 9d6408ea4 -> 9054fd2b9
APEXCORE-683 Apex client should support application packages on HDFS
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/926ecc89
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/926ecc89
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/926ecc89
Branch: refs/heads/master
Commit: 926ecc89aa9083d1b8104f90d1e3bc31dd368bbd
Parents: 04a352b
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sat Mar 25 09:00:33 2017 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Mar 28 10:52:21 2017 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/stram/cli/ApexCli.java | 51 ++++++----
.../datatorrent/stram/client/AppPackage.java | 98 ++++++++++++++++----
.../stram/client/AppPackageTest.java | 17 +---
3 files changed, 117 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index dfaae97..77959ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -20,6 +20,7 @@ package com.datatorrent.stram.cli;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
@@ -139,7 +140,6 @@ import jline.console.completer.StringsCompleter;
import jline.console.history.FileHistory;
import jline.console.history.History;
import jline.console.history.MemoryHistory;
-import net.lingala.zip4j.exception.ZipException;
import sun.misc.Signal;
import sun.misc.SignalHandler;
@@ -460,7 +460,26 @@ public class ApexCli
}
}
- AppPackage newAppPackageInstance(File f) throws IOException, ZipException
+ AppPackage newAppPackageInstance(URI uri, boolean suppressOutput) throws IOException
+ {
+ PrintStream outputStream = suppressOutput ? suppressOutput() : null;
+ try {
+ final String scheme = uri.getScheme();
+ if (scheme == null || scheme.equals("file")) {
+ return new AppPackage(new FileInputStream(new File(expandFileName(uri.getPath(), true))), true);
+ } else {
+ try (FileSystem fs = FileSystem.newInstance(uri, conf)) {
+ return new AppPackage(fs.open(new Path(uri.getPath())), true);
+ }
+ }
+ } finally {
+ if (outputStream != null) {
+ restoreOutput(outputStream);
+ }
+ }
+ }
+
+ AppPackage newAppPackageInstance(File f) throws IOException
{
PrintStream outputStream = suppressOutput();
try {
@@ -609,7 +628,7 @@ public class ApexCli
"Connect to an app"));
globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(),
new Arg[]{},
- new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file"), new Arg("matching-app-name")},
+ new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file-path/app-package-file-uri"), new Arg("matching-app-name")},
"Launch an app", LAUNCH_OPTIONS.options));
globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
new Arg[]{new Arg("app-id")},
@@ -673,17 +692,17 @@ public class ApexCli
new Arg[]{new FileArg("parameter-name")},
"Get the configuration parameter"));
globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(),
- new Arg[]{new FileArg("app-package-file")},
+ new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
new Arg[]{new Arg("-withDescription")},
"Get info on the app package file",
GET_APP_PACKAGE_INFO_OPTIONS));
globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(),
- new Arg[]{new FileArg("app-package-file")},
+ new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
new Arg[]{new Arg("search-term")},
"Get operators within the given app package",
GET_OPERATOR_CLASSES_OPTIONS.options));
globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(),
- new Arg[]{new FileArg("app-package-file"), new Arg("operator-class")},
+ new Arg[]{new FileArg("app-package-file-path/app-package-file-uri"), new Arg("operator-class")},
null,
"Get operator properties within the given app package"));
globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION),
@@ -771,7 +790,7 @@ public class ApexCli
"Begin Logical Plan Change"));
connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
null,
- new Arg[]{new FileArg("jar-file/app-package-file"), new Arg("class-name")},
+ new Arg[]{new FileArg("jar-file/app-package-file-path/app-package-file-uri"), new Arg("class-name")},
"Show logical plan of an app class",
getShowLogicalPlanCommandLineOptions()));
connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
@@ -1944,7 +1963,7 @@ public class ApexCli
// see if it's an app package
AppPackage ap = null;
try {
- ap = newAppPackageInstance(new File(fileName));
+ ap = newAppPackageInstance(new URI(fileName), true);
} catch (Exception ex) {
// It's not an app package
if (requiredAppPackageName != null) {
@@ -2842,18 +2861,15 @@ public class ApexCli
}
if (commandLineInfo.args.length > 0) {
- String filename = expandFileName(commandLineInfo.args[0], true);
-
// see if the first argument is actually an app package
- try {
- AppPackage ap = new AppPackage(new File(filename));
- ap.close();
+ try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), false)) {
new ShowLogicalPlanAppPackageCommand().execute(args, reader);
return;
} catch (Exception ex) {
// fall through
}
+ String filename = expandFileName(commandLineInfo.args[0], true);
if (commandLineInfo.args.length >= 2) {
String appName = commandLineInfo.args[1];
StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
@@ -2938,8 +2954,7 @@ public class ApexCli
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
- String jarfile = expandFileName(args[1], true);
- try (AppPackage ap = newAppPackageInstance(new File(jarfile))) {
+ try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
List<AppInfo> applications = ap.getApplications();
if (args.length >= 3) {
@@ -3499,7 +3514,7 @@ public class ApexCli
String[] tmpArgs = new String[args.length - 2];
System.arraycopy(args, 2, tmpArgs, 0, args.length - 2);
GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs);
- try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) {
+ try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
JSONSerializationProvider jomp = new JSONSerializationProvider();
jomp.addSerializer(PropertyInfo.class,
new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription));
@@ -3877,7 +3892,7 @@ public class ApexCli
String[] tmpArgs = new String[args.length - 1];
System.arraycopy(args, 1, tmpArgs, 0, args.length - 1);
GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(tmpArgs);
- try (AppPackage ap = newAppPackageInstance(new File(expandFileName(commandLineInfo.args[0], true)))) {
+ try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), true)) {
List<String> newArgs = new ArrayList<>();
List<String> jars = new ArrayList<>();
for (String jar : ap.getAppJars()) {
@@ -3907,7 +3922,7 @@ public class ApexCli
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
- try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true)))) {
+ try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
List<String> newArgs = new ArrayList<>();
List<String> jars = new ArrayList<>();
for (String jar : ap.getAppJars()) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index 238b646..a4d0364 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -18,8 +18,12 @@
*/
package com.datatorrent.stram.client;
+import java.io.Closeable;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,7 +35,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
import java.util.jar.JarFile;
+import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import org.codehaus.jackson.JsonGenerator;
@@ -42,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -60,7 +67,7 @@ import net.lingala.zip4j.model.ZipParameters;
*
* @since 1.0.3
*/
-public class AppPackage extends JarFile
+public class AppPackage implements Closeable
{
public static final String ATTRIBUTE_DT_ENGINE_VERSION = "DT-Engine-Version";
public static final String ATTRIBUTE_DT_APP_PACKAGE_NAME = "DT-App-Package-Name";
@@ -160,9 +167,14 @@ public class AppPackage extends JarFile
}
}
- public AppPackage(File file) throws IOException, ZipException
+ public AppPackage(File file) throws IOException
{
- this(file, false);
+ this(new FileInputStream(file));
+ }
+
+ public AppPackage(InputStream input) throws IOException
+ {
+ this(input, false);
}
/**
@@ -178,11 +190,29 @@ public class AppPackage extends JarFile
* @param contentFolder the folder that the app package will be extracted to
* @param processAppDirectory
* @throws java.io.IOException
- * @throws net.lingala.zip4j.exception.ZipException
*/
- public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException, ZipException
+ public AppPackage(File file, File contentFolder, boolean processAppDirectory) throws IOException
+ {
+ this(new FileInputStream(file), contentFolder, processAppDirectory);
+ }
+
+ /**
+ * Creates an App Package object.
+ *
+ * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived
+ * applications
+ *
+ * If contentFolder is not null, it will try to create the contentFolder, file will be retained on disk after App Package is closed
+ * If contentFolder is null, temp folder will be created and will be cleaned on close()
+ *
+ * @param input
+ * @param contentFolder the folder that the app package will be extracted to
+ * @param processAppDirectory
+ * @throws java.io.IOException
+ */
+ public AppPackage(InputStream input, File contentFolder, boolean processAppDirectory) throws IOException
{
- super(file);
+ final JarInputStream jarInputStream = new JarInputStream(input);
if (contentFolder != null) {
FileUtils.forceMkdir(contentFolder);
@@ -193,7 +223,7 @@ public class AppPackage extends JarFile
}
directory = contentFolder;
- Manifest manifest = getManifest();
+ Manifest manifest = jarInputStream.getManifest();
if (manifest == null) {
throw new IOException("Not a valid app package. MANIFEST.MF is not present.");
}
@@ -209,7 +239,7 @@ public class AppPackage extends JarFile
throw new IOException("Not a valid app package. App Package Name or Version or Class-Path is missing from MANIFEST.MF");
}
classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " ")));
- extractToDirectory(directory, file);
+ extractToDirectory(directory, jarInputStream);
File confDirectory = new File(directory, "conf");
if (confDirectory.exists()) {
@@ -251,23 +281,56 @@ public class AppPackage extends JarFile
* @param file
* @param processAppDirectory
* @throws java.io.IOException
- * @throws net.lingala.zip4j.exception.ZipException
*/
- public AppPackage(File file, boolean processAppDirectory) throws IOException, ZipException
+ public AppPackage(File file, boolean processAppDirectory) throws IOException
+ {
+ this(new FileInputStream(file), processAppDirectory);
+ }
+
+ /**
+ * Creates an App Package object.
+ *
+ * If app directory is to be processed, there may be resource leak in the class loader. Only pass true for short-lived
+ * applications
+ *
+ * Files in app package will be extracted to tmp folder and will be cleaned on close()
+ * The close() method could be explicitly called or implicitly called by GC finalize()
+ *
+ * @param input
+ * @param processAppDirectory
+ * @throws java.io.IOException
+ */
+ public AppPackage(InputStream input, boolean processAppDirectory) throws IOException
{
- this(file, null, processAppDirectory);
+ this(input, null, processAppDirectory);
}
- public static void extractToDirectory(File directory, File appPackageFile) throws ZipException
+ public static void extractToDirectory(File directory, File appPackageFile) throws IOException
{
- ZipFile zipFile = new ZipFile(appPackageFile);
+ extractToDirectory(directory, new JarInputStream(new FileInputStream(appPackageFile)));
+ }
- if (zipFile.isEncrypted()) {
- throw new ZipException("Encrypted app package not supported yet");
+ private static void extractToDirectory(File directory, JarInputStream input) throws IOException
+ {
+ File manifestFile = new File(directory, JarFile.MANIFEST_NAME);
+ manifestFile.getParentFile().mkdirs();
+ try (FileOutputStream output = new FileOutputStream(manifestFile)) {
+ input.getManifest().write(output);
}
- directory.mkdirs();
- zipFile.extractAll(directory.getAbsolutePath());
+ JarEntry entry = input.getNextJarEntry();
+ while (entry != null) {
+ File newFile = new File(directory, entry.getName());
+ if (entry.isDirectory()) {
+ newFile.mkdirs();
+ } else {
+ try (FileOutputStream output = new FileOutputStream(newFile)) {
+ IOUtils.copy(input, output);
+ }
+ }
+ input.closeEntry();
+ entry = input.getNextJarEntry();
+ }
}
public static void createAppPackageFile(File fileToBeCreated, File directory) throws ZipException
@@ -286,7 +349,6 @@ public class AppPackage extends JarFile
@Override
public void close() throws IOException
{
- super.close();
if (cleanOnClose) {
cleanContent();
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/926ecc89/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
index 20550a9..f7bfa12 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java
@@ -41,8 +41,6 @@ import com.datatorrent.stram.client.AppPackage.PropertyInfo;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.util.JSONSerializationProvider;
-import net.lingala.zip4j.exception.ZipException;
-
/**
*
*/
@@ -57,24 +55,17 @@ public class AppPackageTest
String appPackageDir = "src/test/resources/testAppPackage/mydtapp";
@BeforeClass
- public static void starting()
+ public static void starting() throws IOException, JSONException
{
+ File file = StramTestSupport.createAppPackageFile();
+ // Set up test instance
+ ap = new AppPackage(file, true);
try {
- File file = StramTestSupport.createAppPackageFile();
- // Set up test instance
- ap = new AppPackage(file, true);
// set up another instance
File testfolder = new File("target/testapp");
yap = new AppPackage(file, testfolder, false);
jomp = new JSONSerializationProvider();
json = new JSONObject(jomp.getContext(null).writeValueAsString(ap));
-
- } catch (ZipException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (JSONException e) {
- throw new RuntimeException(e);
} finally {
IOUtils.closeQuietly(ap);
IOUtils.closeQuietly(yap);
[2/2] apex-core git commit: Merge branch 'APEXCORE-683' of
github.com:vrozov/apex-core
Posted by pr...@apache.org.
Merge branch 'APEXCORE-683' of github.com:vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9054fd2b
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9054fd2b
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9054fd2b
Branch: refs/heads/master
Commit: 9054fd2b9775116aaa790b3946600b84acf41b06
Parents: 9d6408e 926ecc8
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Mar 29 20:45:11 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Mar 29 20:45:11 2017 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/stram/cli/ApexCli.java | 51 ++++++----
.../datatorrent/stram/client/AppPackage.java | 98 ++++++++++++++++----
.../stram/client/AppPackageTest.java | 17 +---
3 files changed, 117 insertions(+), 49 deletions(-)
----------------------------------------------------------------------