You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:07 UTC
[06/50] [abbrv] incubator-apex-core git commit: APEX-159 #resolve
StramMiniClusterTest.testOperatorFailureRecovery succeeds with unexpected
error condition
APEX-159 #resolve StramMiniClusterTest.testOperatorFailureRecovery succeeds with unexpected error condition
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0fc22a09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0fc22a09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0fc22a09
Branch: refs/heads/master
Commit: 0fc22a094d5ae67e249f8dbc728c5e6a32eecfbc
Parents: 0a85586
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sat Sep 26 19:59:02 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sat Sep 26 19:59:02 2015 -0700
----------------------------------------------------------------------
.../stram/LaunchContainerRunnable.java | 27 +++++++-------------
.../java/com/datatorrent/stram/StramClient.java | 10 +-------
.../datatorrent/stram/StramMiniClusterTest.java | 4 +--
3 files changed, 12 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 0a6c062..863808a 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -118,28 +118,19 @@ public class LaunchContainerRunnable implements Runnable
LOG.info("CLASSPATH: {}", classPathEnv);
}
+ public static void addFileToLocalResources(final String name, final FileStatus fileStatus, final LocalResourceType type, final Map<String, LocalResource> localResources)
+ {
+ final LocalResource localResource = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()),
+ type, LocalResourceVisibility.APPLICATION, fileStatus.getLen(), fileStatus.getModificationTime());
+ localResources.put(name, localResource);
+ }
+
public static void addFilesToLocalResources(LocalResourceType type, String commaSeparatedFileNames, Map<String, LocalResource> localResources, FileSystem fs) throws IOException
{
String[] files = StringUtils.splitByWholeSeparator(commaSeparatedFileNames, StramClient.LIB_JARS_SEP);
for (String file : files) {
- Path dst = new Path(file);
- // Create a local resource to point to the destination jar path
- FileStatus destStatus = fs.getFileStatus(dst);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- // Set the type of resource - file or archive
- amJarRsrc.setType(type);
- // Set visibility of the resource
- // Setting to most private option
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- // Set the resource to be copied over
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
- // Set timestamp and length of file so that the framework
- // can do basic sanity checks for the local resource
- // after it has been copied over to ensure it is the same
- // resource the client intended to use with the application
- amJarRsrc.setTimestamp(destStatus.getModificationTime());
- amJarRsrc.setSize(destStatus.getLen());
- localResources.put(dst.getName(), amJarRsrc);
+ final Path dst = new Path(file);
+ addFileToLocalResources(dst.getName(), fs.getFileStatus(dst), type, localResources);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 7edc628..7abfc82 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -497,15 +497,7 @@ public class StramClient
outStream = fs.create(launchConfigDst, true);
conf.writeXml(outStream);
outStream.close();
-
- FileStatus topologyFileStatus = fs.getFileStatus(cfgDst);
- LocalResource topologyRsrc = Records.newRecord(LocalResource.class);
- topologyRsrc.setType(LocalResourceType.FILE);
- topologyRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- topologyRsrc.setResource(ConverterUtils.getYarnUrlFromURI(cfgDst.toUri()));
- topologyRsrc.setTimestamp(topologyFileStatus.getModificationTime());
- topologyRsrc.setSize(topologyFileStatus.getLen());
- localResources.put(LogicalPlan.SER_FILE_NAME, topologyRsrc);
+ LaunchContainerRunnable.addFileToLocalResources(LogicalPlan.SER_FILE_NAME, fs.getFileStatus(cfgDst), LocalResourceType.FILE, localResources);
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 493156b..5d37f76 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -364,8 +364,8 @@ public class StramMiniClusterTest
{
LogicalPlan dag = new LogicalPlan();
- dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
- AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
+ dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + '/' + testMeta.dir);
+ AsyncFSStorageAgent agent = new AsyncFSStorageAgent(dag.getAttributes().get(LogicalPlan.APPLICATION_PATH), null);
agent.setSyncCheckpoint(true);
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);