You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/19 20:21:54 UTC

falcon git commit: FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode

Repository: falcon
Updated Branches:
  refs/heads/master 292dfed34 -> 73339264d


FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode

Author: peeyush b <pb...@hortonworks.com>

Reviewers: "Praveen Adlakha <ad...@gmail.com>, Balu Vellanki <ba...@apache.org>, Venkat Ranganathan  <ve...@hortonworks.com>"

Closes #220 from peeyushb/FALCON-2071


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/73339264
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/73339264
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/73339264

Branch: refs/heads/master
Commit: 73339264d6d8cd627aee0be53007a7bd4e4956ac
Parents: 292dfed
Author: peeyush b <pb...@hortonworks.com>
Authored: Tue Jul 19 13:21:51 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue Jul 19 13:21:51 2016 -0700

----------------------------------------------------------------------
 .../process/SparkProcessWorkflowBuilder.java    | 34 +++++++++-----------
 .../OozieProcessWorkflowBuilderTest.java        |  6 ++--
 2 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index 5f4fafa..51db75d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.util.OozieUtils;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import javax.xml.bind.JAXBElement;
@@ -59,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
 
         String sparkMasterURL = entity.getSparkAttributes().getMaster();
-        String sparkFilePath = entity.getSparkAttributes().getJar();
+        Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar());
         String sparkJobName = entity.getSparkAttributes().getName();
         String sparkOpts = entity.getSparkAttributes().getSparkOpts();
         String sparkClassName = entity.getSparkAttributes().getClazz();
@@ -94,18 +92,28 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         addOutputFeedsAsArgument(argList, cluster);
         addInputFeedsAsArgument(argList, cluster);
 
-        sparkAction.setJar(addUri(sparkFilePath, cluster));
-
-        setSparkLibFileToWorkflowLib(sparkFilePath, entity);
+        // In Oozie spark action, value for jar is either Java jar file path or Python file path.
+        validateSparkJarFilePath(sparkJarFilePath);
+        sparkAction.setJar(sparkJarFilePath.getName());
+        setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity);
         propagateEntityProperties(sparkAction);
 
         OozieUtils.marshalSparkAction(action, actionJaxbElement);
         return action;
     }
 
-    private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) {
+    private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) {
         if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
-            entity.getWorkflow().setLib(sparkFile);
+            entity.getWorkflow().setLib(sparkJarFilePath);
+        } else {
+            String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath;
+            entity.getWorkflow().setLib(workflowLib);
+        }
+    }
+
+    private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException {
+        if (!sparkJarFilePath.isAbsolute()) {
+            throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath);
         }
     }
 
@@ -188,16 +196,6 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         }
     }
 
-    private String addUri(String jarFile, Cluster cluster) throws FalconException {
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-        Path jarFilePath = new Path(jarFile);
-        if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) {
-            return fs.makeQualified(jarFilePath).toString();
-        }
-        return jarFile;
-    }
-
     private String getClusterEntitySparkMaster(Cluster cluster) {
         return ClusterHelper.getSparkMasterEndPoint(cluster);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 30ff537..a692d0c 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -372,6 +372,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
+        assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/falcon-examples.jar");
 
         ACTION sparkNode = getAction(parentWorkflow, "user-action");
 
@@ -380,7 +381,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
 
         assertEquals(sparkAction.getMaster(), "local");
-        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");
+        assertEquals(sparkAction.getJar(), "falcon-examples.jar");
 
         Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
         List<String> argsList = sparkAction.getArg();
@@ -430,6 +431,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
+        assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/spark-wordcount.jar");
 
         ACTION sparkNode = getAction(parentWorkflow, "user-action");
 
@@ -437,7 +439,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 OozieUtils.unMarshalSparkAction(sparkNode);
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
         assertEquals(sparkAction.getMaster(), "local");
-        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/spark-wordcount.jar");
+        assertEquals(sparkAction.getJar(), "spark-wordcount.jar");
         List<String> argsList = sparkAction.getArg();
         Input input = process.getInputs().getInputs().get(0);
         Output output = process.getOutputs().getOutputs().get(0);