You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/15 06:17:38 UTC

falcon git commit: FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath. Contributed by Pallavi Rao.

Repository: falcon
Updated Branches:
  refs/heads/master dfcdb31c3 -> 7ffe1a33b


FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath. Contributed by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7ffe1a33
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7ffe1a33
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7ffe1a33

Branch: refs/heads/master
Commit: 7ffe1a33b6f3e8d4005e1bee89e9f91d7db33e03
Parents: dfcdb31
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jun 15 09:41:08 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jun 15 09:41:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../org/apache/falcon/entity/EntityUtil.java    | 30 ++++++----------
 .../apache/falcon/entity/EntityUtilTest.java    | 36 ++++++++++++++++++++
 .../workflow/engine/OozieWorkflowEngine.java    | 13 +------
 4 files changed, 50 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0bbb78..e0c4333 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
     FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
 
   IMPROVEMENTS
+    FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath
+    (Pallavi Rao via Ajay Yadava)
+    
     FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay Yadava)
     
     FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 7ebf39e..f4f266a 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -45,14 +45,11 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.DateFormat;
@@ -606,27 +603,20 @@ public final class EntityUtil {
             md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis()));
     }
 
-    //Returns all staging paths for the entity
-    public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster,
-                                                  Entity entity) throws FalconException {
-        Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
+    // Given an entity and a cluster, determines if the supplied path is the staging path for that entity.
+    public static boolean isStagingPath(Cluster cluster,
+                                        Entity entity, Path path) throws FalconException {
+        String basePath = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING)
+                .getPath()).toUri().getPath();
         try {
-            return fs.listStatus(basePath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    return !path.getName().equals("logs");
-                }
-            });
-
-        } catch (FileNotFoundException e) {
-            LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
-            //Staging path doesn't exist if entity is not scheduled
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+            String pathString = path.toUri().getPath();
+            String entityPath = entity.getEntityType().name().toLowerCase() + "/" + entity.getName();
+            return fs.exists(path) && pathString.startsWith(basePath) && pathString.contains(entityPath);
         } catch (IOException e) {
             throw new FalconException(e);
         }
-        return null;
     }
 
     public static LateProcess getLateProcess(Entity entity)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index bfdb9f8..cfdc84d 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -20,6 +20,9 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.Pair;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -27,10 +30,14 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LateArrival;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.InputStream;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -288,4 +295,33 @@ public class EntityUtilTest extends AbstractTestBase {
         };
     }
 
+    @Test(dataProvider = "bundlePaths")
+    public void testIsStagingPath(Path path, boolean createPath, boolean expected) throws Exception {
+        ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+        InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster = parser.parse(stream);
+
+        ProcessEntityParser processParser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+        stream = this.getClass().getResourceAsStream(PROCESS_XML);
+        Process process = processParser.parse(stream);
+
+        FileSystem fs = HadoopClientFactory.get().
+                createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
+        if (createPath && !fs.exists(path)) {
+            fs.create(path);
+        }
+
+        Assert.assertEquals(EntityUtil.isStagingPath(cluster, process, path), expected);
+    }
+
+    @DataProvider(name = "bundlePaths")
+    public Object[][] getBundlePaths() {
+        return new Object[][] {
+            {new Path("/projects/falcon/staging/ivory/workflows/process/sample/"), true, true},
+            {new Path("/projects/falcon/staging/falcon/workflows/process/sample/"), true, true},
+            {new Path("/projects/abc/falcon/workflows/process/sample/"), true, false},
+            {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), false, false},
+            {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), true, false},
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 02dcb2d..4085b8f 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -48,7 +48,6 @@ import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
@@ -276,23 +275,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     //Return all bundles for the entity in the requested cluster
     private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
         Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
-        FileStatus[] stgPaths = EntityUtil.getAllStagingPaths(cluster, entity);
         List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
-        if (stgPaths == null) {
-            return filteredJobs;
-        }
-
-        List<String> appPaths = new ArrayList<String>();
-        for (FileStatus file : stgPaths) {
-            appPaths.add(file.getPath().toUri().getPath());
-        }
-
         try {
             List<BundleJob> jobs = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME
                 + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
                 for (BundleJob job : jobs) {
-                    if (appPaths.contains(new Path(job.getAppPath()).toUri().getPath())) {
+                    if (EntityUtil.isStagingPath(cluster, entity, new Path(job.getAppPath()))) {
                         //Load bundle as coord info is not returned in getBundleJobsInfo()
                         BundleJob bundle = getBundleInfo(clusterName, job.getId());
                         filteredJobs.add(bundle);