You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/19 20:21:54 UTC
falcon git commit: FALCON-2071 Falcon Spark SQL failing with Yarn
Client Mode
Repository: falcon
Updated Branches:
refs/heads/master 292dfed34 -> 73339264d
FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode
Author: peeyush b <pb...@hortonworks.com>
Reviewers: "Praveen Adlakha <ad...@gmail.com>, Balu Vellanki <ba...@apache.org>, Venkat Ranganathan <ve...@hortonworks.com>"
Closes #220 from peeyushb/FALCON-2071
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/73339264
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/73339264
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/73339264
Branch: refs/heads/master
Commit: 73339264d6d8cd627aee0be53007a7bd4e4956ac
Parents: 292dfed
Author: peeyush b <pb...@hortonworks.com>
Authored: Tue Jul 19 13:21:51 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue Jul 19 13:21:51 2016 -0700
----------------------------------------------------------------------
.../process/SparkProcessWorkflowBuilder.java | 34 +++++++++-----------
.../OozieProcessWorkflowBuilderTest.java | 6 ++--
2 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index 5f4fafa..51db75d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.util.OozieUtils;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import javax.xml.bind.JAXBElement;
@@ -59,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
String sparkMasterURL = entity.getSparkAttributes().getMaster();
- String sparkFilePath = entity.getSparkAttributes().getJar();
+ Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar());
String sparkJobName = entity.getSparkAttributes().getName();
String sparkOpts = entity.getSparkAttributes().getSparkOpts();
String sparkClassName = entity.getSparkAttributes().getClazz();
@@ -94,18 +92,28 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
addOutputFeedsAsArgument(argList, cluster);
addInputFeedsAsArgument(argList, cluster);
- sparkAction.setJar(addUri(sparkFilePath, cluster));
-
- setSparkLibFileToWorkflowLib(sparkFilePath, entity);
+ // In Oozie spark action, value for jar is either Java jar file path or Python file path.
+ validateSparkJarFilePath(sparkJarFilePath);
+ sparkAction.setJar(sparkJarFilePath.getName());
+ setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity);
propagateEntityProperties(sparkAction);
OozieUtils.marshalSparkAction(action, actionJaxbElement);
return action;
}
- private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) {
+ private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) {
if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
- entity.getWorkflow().setLib(sparkFile);
+ entity.getWorkflow().setLib(sparkJarFilePath);
+ } else {
+ String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath;
+ entity.getWorkflow().setLib(workflowLib);
+ }
+ }
+
+ private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException {
+ if (!sparkJarFilePath.isAbsolute()) {
+ throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath);
}
}
@@ -188,16 +196,6 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
}
}
- private String addUri(String jarFile, Cluster cluster) throws FalconException {
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- ClusterHelper.getConfiguration(cluster));
- Path jarFilePath = new Path(jarFile);
- if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) {
- return fs.makeQualified(jarFilePath).toString();
- }
- return jarFile;
- }
-
private String getClusterEntitySparkMaster(Cluster cluster) {
return ClusterHelper.getSparkMasterEndPoint(cluster);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 30ff537..a692d0c 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -372,6 +372,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
+ assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/falcon-examples.jar");
ACTION sparkNode = getAction(parentWorkflow, "user-action");
@@ -380,7 +381,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
assertEquals(sparkAction.getMaster(), "local");
- assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");
+ assertEquals(sparkAction.getJar(), "falcon-examples.jar");
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
List<String> argsList = sparkAction.getArg();
@@ -430,6 +431,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
+ assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/spark-wordcount.jar");
ACTION sparkNode = getAction(parentWorkflow, "user-action");
@@ -437,7 +439,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
OozieUtils.unMarshalSparkAction(sparkNode);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
assertEquals(sparkAction.getMaster(), "local");
- assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/spark-wordcount.jar");
+ assertEquals(sparkAction.getJar(), "spark-wordcount.jar");
List<String> argsList = sparkAction.getArg();
Input input = process.getInputs().getInputs().get(0);
Output output = process.getOutputs().getOutputs().get(0);