You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/08/16 01:33:44 UTC
apex-core git commit: APEXCORE-495 Config packages now support *apps*
Repository: apex-core
Updated Branches:
refs/heads/master 130ce6ba8 -> a42c67bc2
APEXCORE-495 Config packages now support *apps*
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a42c67bc
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a42c67bc
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a42c67bc
Branch: refs/heads/master
Commit: a42c67bc213d7b58c6924426cb07d71fe701ce1b
Parents: 130ce6b
Author: sandeshh <sa...@gmail.com>
Authored: Mon Aug 1 18:27:29 2016 -0700
Committer: sandeshh <sa...@gmail.com>
Committed: Mon Aug 15 18:05:40 2016 -0700
----------------------------------------------------------------------
.../main/resources/archetype-resources/pom.xml | 1 +
.../src/assemble/confPackage.xml | 4 +
.../java/com/datatorrent/stram/cli/ApexCli.java | 61 ++++++++++++-
.../datatorrent/stram/client/AppPackage.java | 29 +++---
.../datatorrent/stram/client/ConfigPackage.java | 40 +++++++++
.../stram/client/StramClientUtils.java | 25 ++++++
.../com/datatorrent/stram/cli/ApexCliTest.java | 94 ++++++++++++++++----
.../testConfigPackageSrc/app/testApp.json | 53 +++++++++++
8 files changed, 270 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml b/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml
index 4645c77..81e403c 100644
--- a/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml
+++ b/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml
@@ -25,6 +25,7 @@
<apex.apppackage.maxversion>1.9999.9999</apex.apppackage.maxversion>
<apex.appconf.classpath>classpath/*</apex.appconf.classpath>
<apex.appconf.files>files/*</apex.appconf.files>
+ <apex.appconf.app>app/*</apex.appconf.app>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml
----------------------------------------------------------------------
diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml b/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml
index 4d5d7c7..a03dce9 100644
--- a/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml
+++ b/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml
@@ -19,6 +19,10 @@
<directory>${basedir}/src/main/resources/files</directory>
<outputDirectory>/files</outputDirectory>
</fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
</fileSets>
</assembly>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 4e1f201..91d29bd 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -185,6 +185,9 @@ public class ApexCli
private String kerberosPrincipal;
private String kerberosKeyTab;
+ private static String CONFIG_EXCLUSIVE = "exclusive";
+ private static String CONFIG_INCLUSIVE = "inclusive";
+
private static class FileLineReader extends ConsoleReader
{
private final BufferedReader br;
@@ -978,7 +981,7 @@ public class ApexCli
return null;
}
- private static class CliException extends RuntimeException
+ static class CliException extends RuntimeException
{
private static final long serialVersionUID = 1L;
@@ -3453,7 +3456,7 @@ public class ApexCli
matchAppName = commandLineInfo.args[1];
}
- List<AppInfo> applications = new ArrayList<>(ap.getApplications());
+ List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps);
if (matchAppName != null) {
Iterator<AppInfo> it = applications.iterator();
@@ -3639,7 +3642,9 @@ public class ApexCli
DTConfiguration getLaunchAppPackageProperties(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, String appName) throws Exception
{
DTConfiguration launchProperties = new DTConfiguration();
- List<AppInfo> applications = ap.getApplications();
+
+ List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps);
+
AppInfo selectedApp = null;
for (AppInfo app : applications) {
if (app.name.equals(appName)) {
@@ -3715,6 +3720,52 @@ public class ApexCli
return launchProperties;
}
+ private List<AppInfo> getAppsFromPackageAndConfig(AppPackage ap, ConfigPackage cp, String configApps)
+ {
+ if (cp == null || configApps == null || !(configApps.equals(CONFIG_INCLUSIVE) || configApps.equals(CONFIG_EXCLUSIVE))) {
+ return ap.getApplications();
+ }
+
+ File src = new File(cp.tempDirectory(), "app");
+ File dest = new File(ap.tempDirectory(), "app");
+
+ if (!src.exists()) {
+ return ap.getApplications();
+ }
+
+ if (configApps.equals(CONFIG_EXCLUSIVE)) {
+
+ for (File file : dest.listFiles()) {
+
+ if (file.getName().endsWith(".json")) {
+ FileUtils.deleteQuietly(new File(dest, file.getName()));
+ }
+ }
+ } else {
+ for (File file : src.listFiles()) {
+ FileUtils.deleteQuietly(new File(dest, file.getName()));
+ }
+ }
+
+ for (File file : src.listFiles()) {
+ try {
+ FileUtils.moveFileToDirectory(file, dest, true);
+ } catch (IOException e) {
+ LOG.warn("Application from the config file {} failed while processing.", file.getName());
+ }
+ }
+
+ try {
+ FileUtils.deleteDirectory(src);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete the Config Apps folder");
+ }
+
+ ap.processAppDirectory(configApps.equals(CONFIG_EXCLUSIVE));
+
+ return ap.getApplications();
+ }
+
private class GetAppPackageOperatorsCommand implements Command
{
@Override
@@ -3850,6 +3901,7 @@ public class ApexCli
final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name"));
final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue"));
final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility"));
+ final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps"));
private Option add(Option opt)
{
@@ -3890,6 +3942,8 @@ public class ApexCli
result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt());
result.exactMatch = line.hasOption("exactMatch");
result.force = line.hasOption("force");
+ result.useConfigApps = line.getOptionValue(LAUNCH_OPTIONS.useConfigApps.getOpt());
+
return result;
}
@@ -3908,6 +3962,7 @@ public class ApexCli
boolean exactMatch;
boolean force;
String[] args;
+ String useConfigApps;
}
@SuppressWarnings("static-access")
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index 863b535..a260fc0 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -49,6 +49,7 @@ import net.lingala.zip4j.core.ZipFile;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.ZipParameters;
+
/**
* <p>
* AppPackage class.</p>
@@ -159,7 +160,7 @@ public class AppPackage extends JarFile
classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " ")));
extractToDirectory(directory, file);
if (processAppDirectory) {
- processAppDirectory(new File(directory, "app"));
+ processAppDirectory(false);
}
File confDirectory = new File(directory, "conf");
if (confDirectory.exists()) {
@@ -318,8 +319,11 @@ public class AppPackage extends JarFile
return Collections.unmodifiableMap(defaultProperties);
}
- private void processAppDirectory(File dir)
+ public void processAppDirectory(boolean skipJars)
{
+ File dir = new File(directory, "app");
+ applications.clear();
+
Configuration config = new Configuration();
List<String> absClassPath = new ArrayList<>(classPath);
@@ -333,7 +337,7 @@ public class AppPackage extends JarFile
File[] files = dir.listFiles();
for (File entry : files) {
- if (entry.getName().endsWith(".jar")) {
+ if (entry.getName().endsWith(".jar") && !skipJars) {
appJars.add(entry.getName());
try {
StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config);
@@ -371,23 +375,12 @@ public class AppPackage extends JarFile
for (File entry : files) {
if (entry.getName().endsWith(".json")) {
appJsonFiles.add(entry.getName());
- try {
- AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(entry);
- StramAppLauncher stramAppLauncher = new StramAppLauncher(entry.getName(), config);
- stramAppLauncher.loadDependencies();
- AppInfo appInfo = new AppInfo(appFactory.getName(), entry.getName(), "json");
- appInfo.displayName = appFactory.getDisplayName();
- try {
- appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
- appInfo.dag.validate();
- } catch (Exception ex) {
- appInfo.error = ex.getMessage();
- appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex);
- }
+ AppInfo appInfo = StramClientUtils.jsonFileToAppInfo(entry, config);
+
+ if (appInfo != null) {
applications.add(appInfo);
- } catch (Exception ex) {
- LOG.error("Caught exceptions trying to process {}", entry.getName(), ex);
}
+
} else if (entry.getName().endsWith(".properties")) {
appPropertiesFiles.add(entry.getName());
try {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java b/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java
index f81ba67..0d833a4 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import net.lingala.zip4j.core.ZipFile;
import net.lingala.zip4j.exception.ZipException;
@@ -71,6 +72,7 @@ public class ConfigPackage extends JarFile implements Closeable
private final Map<String, String> properties = new TreeMap<>();
private final Map<String, Map<String, String>> appProperties = new TreeMap<>();
+ private final List<AppPackage.AppInfo> applications = new ArrayList<>();
/**
* Creates an Config Package object.
@@ -114,6 +116,12 @@ public class ConfigPackage extends JarFile implements Closeable
directory = newDirectory.getAbsolutePath();
zipFile.extractAll(directory);
processPropertiesXml();
+ processAppDirectory(new File(directory, "app"));
+ }
+
+ public List<AppPackage.AppInfo> getApplications()
+ {
+ return Collections.unmodifiableList(applications);
}
public String tempDirectory()
@@ -177,6 +185,38 @@ public class ConfigPackage extends JarFile implements Closeable
}
}
+ private void processAppDirectory(File dir)
+ {
+ if (!dir.exists()) {
+ return;
+ }
+
+ Configuration config = new Configuration();
+
+ List<String> absClassPath = new ArrayList<>(classPath);
+ for (int i = 0; i < absClassPath.size(); i++) {
+ String path = absClassPath.get(i);
+ if (!path.startsWith("/")) {
+ absClassPath.set(i, directory + "/" + path);
+ }
+ }
+
+ config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, StringUtils.join(absClassPath, ','));
+
+ File[] files = dir.listFiles();
+
+ for (File entry : files) {
+ if (entry.getName().endsWith(".json")) {
+
+ AppPackage.AppInfo appInfo = StramClientUtils.jsonFileToAppInfo(entry, config);
+
+ if (appInfo != null) {
+ applications.add(appInfo);
+ }
+ }
+ }
+ }
+
private void processPropertiesXml()
{
File dir = new File(directory, "META-INF");
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 85d6b0c..fc60961 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@@ -843,4 +844,28 @@ public class StramClientUtils
return result;
}
+ public static AppPackage.AppInfo jsonFileToAppInfo(File file, Configuration config)
+ {
+ AppPackage.AppInfo appInfo = null;
+
+ try {
+ StramAppLauncher.AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(file);
+ StramAppLauncher stramAppLauncher = new StramAppLauncher(file.getName(), config);
+ stramAppLauncher.loadDependencies();
+ appInfo = new AppPackage.AppInfo(appFactory.getName(), file.getName(), "json");
+ appInfo.displayName = appFactory.getDisplayName();
+ try {
+ appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
+ appInfo.dag.validate();
+ } catch (Exception ex) {
+ appInfo.error = ex.getMessage();
+ appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex);
+ }
+ } catch (Exception ex) {
+ LOG.error("Caught exceptions trying to process {}", file.getName(), ex);
+ }
+
+ return appInfo;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index f26463d..c049e5b 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -22,8 +22,10 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -35,7 +37,7 @@ import com.datatorrent.stram.client.ConfigPackage;
import com.datatorrent.stram.client.DTConfiguration;
import com.datatorrent.stram.support.StramTestSupport;
-import static com.datatorrent.stram.support.StramTestSupport.setEnv;
+import jline.console.ConsoleReader;
/**
*
@@ -52,33 +54,31 @@ public class ApexCliTest
private static File appFile;
private static File configFile;
-
- private static AppPackage ap;
- private static ConfigPackage cp;
+ private AppPackage ap;
+ private ConfigPackage cp;
static TemporaryFolder testFolder = new TemporaryFolder();
- static ApexCli cli = new ApexCli();
+ ApexCli cli;
static Map<String, String> env = new HashMap<String, String>();
static String userHome;
@BeforeClass
- public static void starting()
+ public static void createPackages()
{
+ userHome = System.getProperty("user.home");
+ String newHome = System.getProperty("user.dir") + "/target";
try {
- userHome = System.getProperty("user.home");
- String newHome = System.getProperty("user.dir") + "/target";
+
FileUtils.forceMkdir(new File(newHome + "/.dt"));
FileUtils.copyFile(new File(System.getProperty("user.dir") + "/src/test/resources/testAppPackage/dt-site.xml"), new File(newHome + "/.dt/dt-site.xml"));
env.put("HOME", newHome);
- setEnv(env);
-
- cli.init();
+ StramTestSupport.setEnv(env);
// Set up jar file to use with constructor
testFolder.create();
+
appFile = StramTestSupport.createAppPackageFile();
configFile = StramTestSupport.createConfigPackageFile(new File(testFolder.getRoot(), configJarPath));
- ap = new AppPackage(appFile, true);
- cp = new ConfigPackage(configFile);
+
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -88,9 +88,7 @@ public class ApexCliTest
public static void finished()
{
try {
- env.put("HOME", userHome);
- setEnv(env);
-
+
StramTestSupport.removeAppPackageFile();
FileUtils.forceDelete(configFile);
testFolder.delete();
@@ -99,6 +97,29 @@ public class ApexCliTest
}
}
+ @Before
+ public void startingTest()
+ {
+ try {
+
+ cli = new ApexCli();
+ cli.init();
+
+ ap = new AppPackage(appFile, true);
+ cp = new ConfigPackage(configFile);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void finishedTest()
+ {
+ ap = null;
+ cp = null;
+ cli = null;
+ }
+
@Test
public void testLaunchAppPackagePropertyPrecedence() throws Exception
{
@@ -128,6 +149,9 @@ public class ApexCliTest
{
ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli
.getLaunchCommandLineInfo(new String[]{"-exactMatch", "-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "MyFirstApplication"});
+
+ commandLineInfo.args = new String[] {"MyFirstApplication"};
+
String[] args = cli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, null);
commandLineInfo = ApexCli.getLaunchCommandLineInfo(args);
StringBuilder sb = new StringBuilder();
@@ -180,4 +204,42 @@ public class ApexCliTest
Assert.assertEquals("app-default", props.get("dt.test.5"));
Assert.assertEquals("package-default", props.get("dt.test.6"));
}
+
+ @Test
+ public void testAppFromOnlyConfigPackage() throws Exception
+ {
+ ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli
+ .getLaunchCommandLineInfo(new String[]{"-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "-useConfigApps", "exclusive"});
+
+ ApexCli apexCli = new ApexCli();
+ apexCli.init();
+
+ Assert.assertEquals("configApps", "exclusive", commandLineInfo.useConfigApps);
+
+ apexCli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, new ConsoleReader());
+
+ Assert.assertEquals(ap.getApplications().size(), 1);
+ Assert.assertEquals(ap.getApplications().get(0).displayName, "testApp");
+ Assert.assertEquals(ap.getApplications().get(0).type, "json");
+ }
+
+ @Test
+ public void testMergeAppFromConfigAndAppPackage() throws Exception
+ {
+ ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli
+ .getLaunchCommandLineInfo(new String[]{"-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "-useConfigApps", "inclusive"});
+
+ Assert.assertEquals("configApps", "inclusive", commandLineInfo.useConfigApps);
+
+ ApexCli apexCli = new ApexCli();
+ apexCli.init();
+
+ try {
+ apexCli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, new ConsoleReader());
+ } catch (ApexCli.CliException cliException) {
+ return;
+ }
+
+ Assert.fail("Cli failed throw multiple apps exception.");
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json b/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json
new file mode 100644
index 0000000..2ef4871
--- /dev/null
+++ b/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json
@@ -0,0 +1,53 @@
+{
+ "displayName":"testApp",
+ "attributes":{
+
+ },
+ "operators":[
+ {
+ "name":"Operator 1",
+ "attributes":{
+
+ },
+ "class":"org.apache.apex.test.DevNull",
+ "ports":[
+ {
+ "name":"inputPort",
+ "attributes":{
+
+ }
+ }
+ ]
+ },
+ {
+ "name":"Operator 2",
+ "attributes":{
+
+ },
+ "class":"org.apache.apex.test.RandomGen",
+ "ports":[
+ {
+ "name":"out",
+ "attributes":{
+
+ }
+ }
+ ]
+ }
+ ],
+ "streams":[
+ {
+ "name":"Stream 1",
+ "sinks":[
+ {
+ "operatorName":"Operator 1",
+ "portName":"inputPort"
+ }
+ ],
+ "source":{
+ "operatorName":"Operator 2",
+ "portName":"out"
+ }
+ }
+ ]
+}