You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/28 02:57:39 UTC

[1/3] incubator-apex-core git commit: APEX-159 #resolve StramMiniClusterTest.testOperatorFailureRecovery succeeds with unexpected error condition

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 0a89c83c4 -> 8e49cfb1a


APEX-159 #resolve StramMiniClusterTest.testOperatorFailureRecovery succeeds with unexpected error condition


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0fc22a09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0fc22a09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0fc22a09

Branch: refs/heads/devel-3
Commit: 0fc22a094d5ae67e249f8dbc728c5e6a32eecfbc
Parents: 0a85586
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sat Sep 26 19:59:02 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sat Sep 26 19:59:02 2015 -0700

----------------------------------------------------------------------
 .../stram/LaunchContainerRunnable.java          | 27 +++++++-------------
 .../java/com/datatorrent/stram/StramClient.java | 10 +-------
 .../datatorrent/stram/StramMiniClusterTest.java |  4 +--
 3 files changed, 12 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 0a6c062..863808a 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -118,28 +118,19 @@ public class LaunchContainerRunnable implements Runnable
     LOG.info("CLASSPATH: {}", classPathEnv);
   }
 
+  public static void addFileToLocalResources(final String name, final FileStatus fileStatus, final LocalResourceType type, final Map<String, LocalResource> localResources)
+  {
+    final LocalResource localResource = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()),
+            type, LocalResourceVisibility.APPLICATION, fileStatus.getLen(), fileStatus.getModificationTime());
+    localResources.put(name, localResource);
+  }
+
   public static void addFilesToLocalResources(LocalResourceType type, String commaSeparatedFileNames, Map<String, LocalResource> localResources, FileSystem fs) throws IOException
   {
     String[] files = StringUtils.splitByWholeSeparator(commaSeparatedFileNames, StramClient.LIB_JARS_SEP);
     for (String file : files) {
-      Path dst = new Path(file);
-      // Create a local resource to point to the destination jar path
-      FileStatus destStatus = fs.getFileStatus(dst);
-      LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-      // Set the type of resource - file or archive
-      amJarRsrc.setType(type);
-      // Set visibility of the resource
-      // Setting to most private option
-      amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-      // Set the resource to be copied over
-      amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
-      // Set timestamp and length of file so that the framework
-      // can do basic sanity checks for the local resource
-      // after it has been copied over to ensure it is the same
-      // resource the client intended to use with the application
-      amJarRsrc.setTimestamp(destStatus.getModificationTime());
-      amJarRsrc.setSize(destStatus.getLen());
-      localResources.put(dst.getName(), amJarRsrc);
+      final Path dst = new Path(file);
+      addFileToLocalResources(dst.getName(), fs.getFileStatus(dst), type, localResources);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 7edc628..7abfc82 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -497,15 +497,7 @@ public class StramClient
       outStream = fs.create(launchConfigDst, true);
       conf.writeXml(outStream);
       outStream.close();
-
-      FileStatus topologyFileStatus = fs.getFileStatus(cfgDst);
-      LocalResource topologyRsrc = Records.newRecord(LocalResource.class);
-      topologyRsrc.setType(LocalResourceType.FILE);
-      topologyRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-      topologyRsrc.setResource(ConverterUtils.getYarnUrlFromURI(cfgDst.toUri()));
-      topologyRsrc.setTimestamp(topologyFileStatus.getModificationTime());
-      topologyRsrc.setSize(topologyFileStatus.getLen());
-      localResources.put(LogicalPlan.SER_FILE_NAME, topologyRsrc);
+      LaunchContainerRunnable.addFileToLocalResources(LogicalPlan.SER_FILE_NAME, fs.getFileStatus(cfgDst), LocalResourceType.FILE, localResources);
 
       // Set local resource info into app master container launch context
       amContainer.setLocalResources(localResources);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 493156b..5d37f76 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -364,8 +364,8 @@ public class StramMiniClusterTest
   {
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + '/' + testMeta.dir);
+    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(dag.getAttributes().get(LogicalPlan.APPLICATION_PATH), null);
     agent.setSyncCheckpoint(true);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);


[2/3] incubator-apex-core git commit: APEX-159 #resolve Fix application path in mini cluster test.

Posted by vr...@apache.org.
APEX-159 #resolve Fix application path in mini cluster test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fb53705d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fb53705d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fb53705d

Branch: refs/heads/devel-3
Commit: fb53705d928f18087675a1bfb8b45c9580129682
Parents: 0a89c83
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sun Sep 27 16:08:00 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sun Sep 27 16:08:00 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StramMiniClusterTest.java | 11 ++---------
 1 file changed, 2 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fb53705d/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 493156b..f0fd325 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -51,7 +51,6 @@ import com.sun.jersey.api.client.WebResource;
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.client.StramClientUtils.YarnClientHelper;
 import com.datatorrent.stram.engine.GenericTestOperator;
@@ -206,9 +205,6 @@ public class StramMiniClusterTest
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
     tb.addFromProperties(dagProps, null);
     LogicalPlan dag = createDAG(tb);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
-    agent.setSyncCheckpoint(true);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
     Configuration yarnConf = new Configuration(yarnCluster.getConfig());
     StramClient client = new StramClient(yarnConf, dag);
     try {
@@ -231,7 +227,7 @@ public class StramMiniClusterTest
   private LogicalPlan createDAG(LogicalPlanConfiguration lpc) throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + "/" + testMeta.dir);
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
     lpc.prepareDAG(dag,null,"testApp");
     dag.validate();
     Assert.assertEquals("", Integer.valueOf(128), dag.getValue(DAG.MASTER_MEMORY_MB));
@@ -364,10 +360,7 @@ public class StramMiniClusterTest
   {
 
     LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir);
-    AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null);
-    agent.setSyncCheckpoint(true);
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString());
     FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
     dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
 


[3/3] incubator-apex-core git commit: Merge branch 'APEX-159' of https://github.com/vrozov/incubator-apex-core into APEX-159

Posted by vr...@apache.org.
Merge branch 'APEX-159' of https://github.com/vrozov/incubator-apex-core into APEX-159

Conflicts:
	engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8e49cfb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8e49cfb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8e49cfb1

Branch: refs/heads/devel-3
Commit: 8e49cfb1a9f1ccd818016ad6f169ebfde0256775
Parents: fb53705 0fc22a0
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sun Sep 27 17:28:45 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sun Sep 27 17:28:45 2015 -0700

----------------------------------------------------------------------
 .../stram/LaunchContainerRunnable.java          | 27 +++++++-------------
 .../java/com/datatorrent/stram/StramClient.java | 10 +-------
 2 files changed, 10 insertions(+), 27 deletions(-)
----------------------------------------------------------------------