You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/15 06:17:38 UTC
falcon git commit: FALCON-1114 Oozie findBundles lists a directory
and tries to match with the bundle's appPath. Contributed by Pallavi Rao.
Repository: falcon
Updated Branches:
refs/heads/master dfcdb31c3 -> 7ffe1a33b
FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath. Contributed by Pallavi Rao.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7ffe1a33
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7ffe1a33
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7ffe1a33
Branch: refs/heads/master
Commit: 7ffe1a33b6f3e8d4005e1bee89e9f91d7db33e03
Parents: dfcdb31
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jun 15 09:41:08 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jun 15 09:41:08 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../org/apache/falcon/entity/EntityUtil.java | 30 ++++++----------
.../apache/falcon/entity/EntityUtilTest.java | 36 ++++++++++++++++++++
.../workflow/engine/OozieWorkflowEngine.java | 13 +------
4 files changed, 50 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0bbb78..e0c4333 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
IMPROVEMENTS
+ FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath
+ (Pallavi Rao via Ajay Yadava)
+
FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay Yadava)
FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 7ebf39e..f4f266a 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -45,14 +45,11 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.text.DateFormat;
@@ -606,27 +603,20 @@ public final class EntityUtil {
md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis()));
}
- //Returns all staging paths for the entity
- public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster,
- Entity entity) throws FalconException {
- Path basePath = getBaseStagingPath(cluster, entity);
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- ClusterHelper.getConfiguration(cluster));
+ // Given an entity and a cluster, determines if the supplied path is the staging path for that entity.
+ public static boolean isStagingPath(Cluster cluster,
+ Entity entity, Path path) throws FalconException {
+ String basePath = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING)
+ .getPath()).toUri().getPath();
try {
- return fs.listStatus(basePath, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return !path.getName().equals("logs");
- }
- });
-
- } catch (FileNotFoundException e) {
- LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
- //Staging path doesn't exist if entity is not scheduled
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+ ClusterHelper.getConfiguration(cluster));
+ String pathString = path.toUri().getPath();
+ String entityPath = entity.getEntityType().name().toLowerCase() + "/" + entity.getName();
+ return fs.exists(path) && pathString.startsWith(basePath) && pathString.contains(entityPath);
} catch (IOException e) {
throw new FalconException(e);
}
- return null;
}
public static LateProcess getLateProcess(Entity entity)
http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index bfdb9f8..cfdc84d 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -20,6 +20,9 @@ package org.apache.falcon.entity;
import org.apache.falcon.Pair;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -27,10 +30,14 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LateArrival;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.InputStream;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -288,4 +295,33 @@ public class EntityUtilTest extends AbstractTestBase {
};
}
+ @Test(dataProvider = "bundlePaths")
+ public void testIsStagingPath(Path path, boolean createPath, boolean expected) throws Exception {
+ ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+ InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+ org.apache.falcon.entity.v0.cluster.Cluster cluster = parser.parse(stream);
+
+ ProcessEntityParser processParser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+ stream = this.getClass().getResourceAsStream(PROCESS_XML);
+ Process process = processParser.parse(stream);
+
+ FileSystem fs = HadoopClientFactory.get().
+ createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
+ if (createPath && !fs.exists(path)) {
+ fs.create(path);
+ }
+
+ Assert.assertEquals(EntityUtil.isStagingPath(cluster, process, path), expected);
+ }
+
+ @DataProvider(name = "bundlePaths")
+ public Object[][] getBundlePaths() {
+ return new Object[][] {
+ {new Path("/projects/falcon/staging/ivory/workflows/process/sample/"), true, true},
+ {new Path("/projects/falcon/staging/falcon/workflows/process/sample/"), true, true},
+ {new Path("/projects/abc/falcon/workflows/process/sample/"), true, false},
+ {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), false, false},
+ {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), true, false},
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 02dcb2d..4085b8f 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -48,7 +48,6 @@ import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
@@ -276,23 +275,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
//Return all bundles for the entity in the requested cluster
private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
- FileStatus[] stgPaths = EntityUtil.getAllStagingPaths(cluster, entity);
List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
- if (stgPaths == null) {
- return filteredJobs;
- }
-
- List<String> appPaths = new ArrayList<String>();
- for (FileStatus file : stgPaths) {
- appPaths.add(file.getPath().toUri().getPath());
- }
-
try {
List<BundleJob> jobs = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME
+ "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
if (jobs != null) {
for (BundleJob job : jobs) {
- if (appPaths.contains(new Path(job.getAppPath()).toUri().getPath())) {
+ if (EntityUtil.isStagingPath(cluster, entity, new Path(job.getAppPath()))) {
//Load bundle as coord info is not returned in getBundleJobsInfo()
BundleJob bundle = getBundleInfo(clusterName, job.getId());
filteredJobs.add(bundle);