You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/09/10 08:43:36 UTC

git commit: FALCON-677 Feed data and stats path defaults to /tmp/. Contributed by Suhas Vasu

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 2e3eebdff -> 4bdfac484


FALCON-677 Feed data and stats path defaults to /tmp/. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/4bdfac48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/4bdfac48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/4bdfac48

Branch: refs/heads/master
Commit: 4bdfac48434b48bd410bc8ac8c60617032be14a0
Parents: 2e3eebd
Author: Shwetha GS <sh...@inmobi.com>
Authored: Wed Sep 10 12:13:25 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Wed Sep 10 12:13:25 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../org/apache/falcon/entity/FeedHelper.java    |  2 +-
 .../apache/falcon/entity/FileSystemStorage.java | 32 +++++++++++---------
 .../falcon/entity/parser/FeedEntityParser.java  | 32 +++++++++++++++++---
 .../falcon/entity/FileSystemStorageTest.java    |  2 +-
 .../ProcessExecutionCoordinatorBuilder.java     | 12 ++++++++
 .../OozieProcessWorkflowBuilderTest.java        |  4 +--
 7 files changed, 61 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c98113..483dee2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-677 Feed data and stats path defaults to /tmp/. (Suhas Vasu via Shwetha GS)
+
    FALCON-590 Update to ACLs added to process is not handled
    (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 323188d..4174135 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -194,7 +194,7 @@ public final class FeedHelper {
         return getStorageType(feed, feedCluster);
     }
 
-    protected static List<Location> getLocations(Cluster cluster, Feed feed) {
+    public static List<Location> getLocations(Cluster cluster, Feed feed) {
         // check if locations are overridden in cluster
         final Locations clusterLocations = cluster.getLocations();
         if (clusterLocations != null

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 4eb3d60..58506ad 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.AccessControlList;
@@ -167,8 +168,12 @@ public class FileSystemStorage implements Storage {
 
     @Override
     public String getUriTemplate(LocationType locationType) {
+        return getUriTemplate(locationType, locations);
+    }
+
+    public String getUriTemplate(LocationType locationType, List<Location> locationList) {
         Location locationForType = null;
-        for (Location location : locations) {
+        for (Location location : locationList) {
             if (location.getType() == locationType) {
                 locationForType = location;
                 break;
@@ -176,7 +181,7 @@ public class FileSystemStorage implements Storage {
         }
 
         if (locationForType == null) {
-            return "/tmp";
+            return null;
         }
 
         // normalize the path so trailing and double '/' are removed
@@ -216,27 +221,24 @@ public class FileSystemStorage implements Storage {
         final List<Location> fsStorageLocations = fsStorage.getLocations();
 
         return getLocations().size() == fsStorageLocations.size()
-               && getLocation(getLocations(), LocationType.DATA).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.DATA).getPath())
-               && getLocation(getLocations(), LocationType.META).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.META).getPath())
-               && getLocation(getLocations(), LocationType.STATS).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.STATS).getPath())
-               && getLocation(getLocations(), LocationType.TMP).getPath().equals(
-                   getLocation(fsStorageLocations, LocationType.TMP).getPath());
+                && StringUtils.equals(getUriTemplate(LocationType.DATA, getLocations()),
+                    getUriTemplate(LocationType.DATA, fsStorageLocations))
+                && StringUtils.equals(getUriTemplate(LocationType.STATS, getLocations()),
+                    getUriTemplate(LocationType.STATS, fsStorageLocations))
+                && StringUtils.equals(getUriTemplate(LocationType.META, getLocations()),
+                    getUriTemplate(LocationType.META, fsStorageLocations))
+                && StringUtils.equals(getUriTemplate(LocationType.TMP, getLocations()),
+                    getUriTemplate(LocationType.TMP, fsStorageLocations));
     }
 
-    private static Location getLocation(List<Location> locations, LocationType type) {
+    public static Location getLocation(List<Location> locations, LocationType type) {
         for (Location loc : locations) {
             if (loc.getType() == type) {
                 return loc;
             }
         }
 
-        Location loc = new Location();
-        loc.setPath("/tmp");
-        loc.setType(type);
-        return loc;
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 8fd56e1..34b764b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -21,11 +21,7 @@ package org.apache.falcon.entity.parser;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.*;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
@@ -35,6 +31,7 @@ import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -50,6 +47,7 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.List;
 
 /**
  * Parser that parses feed entity definition.
@@ -81,6 +79,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
 
         validateFeedStorage(feed);
+        validateFeedPath(feed);
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
         validateACL(feed);
@@ -432,4 +431,27 @@ public class FeedEntityParser extends EntityParser<Feed> {
             }
         }
     }
+
+    /**
+     * Validate if FileSystem based feed contains location type data.
+     *
+     * @param feed Feed entity
+     * @throws FalconException
+     */
+    private void validateFeedPath(Feed feed) throws FalconException {
+        if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+            return;
+        }
+
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            List<Location> locations = FeedHelper.getLocations(cluster, feed);
+            Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
+
+            if (dataLocation == null) {
+                throw new ValidationException(feed.getName() + " is a FileSystem based feed "
+                        + "but it doesn't contain location type - data in cluster " + cluster.getName().toString());
+            }
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index a78c678..4bb7772 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -70,7 +70,7 @@ public class FileSystemStorageTest {
                 + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
 
         FileSystemStorage storage = new FileSystemStorage(feedBasePath);
-        Assert.assertEquals(storage.getUriTemplate(), feedBasePath + "#TMP=/tmp");
+        Assert.assertEquals(storage.getUriTemplate(), feedBasePath);
 
         Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
         Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 1fa6758..7a87919 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -175,6 +175,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
                 }
 
                 SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+                if (syncdataset == null) {
+                    return;
+                }
                 coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
 
                 DATAIN datain = createDataIn(input);
@@ -218,6 +221,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
 
         String uriTemplate = storage.getUriTemplate(locationType);
+        if (uriTemplate == null) {
+            return null;
+        }
         if (storage.getType() == Storage.TYPE.TABLE) {
             uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
         }
@@ -274,6 +280,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
             Storage storage = FeedHelper.createStorage(cluster, feed);
 
             SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+            if (syncdataset == null) {
+                return;
+            }
             coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
 
             DATAOUT dataout = createDataOut(output);
@@ -320,6 +329,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         String type = locType.name().toLowerCase();
 
         SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+        if (dataset == null) {
+            return;
+        }
         coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
 
         DATAOUT dataout = new DATAOUT();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 23c01a9..d6f9b54 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -162,8 +162,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 coord.getOutputEvents().getDataOut().get(1).getName());
         assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
                 coord.getOutputEvents().getDataOut().get(2).getName());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
-                coord.getOutputEvents().getDataOut().get(3).getName());
 
         assertEquals(process.getOutputs().getOutputs().get(0).getName(),
                 coord.getOutputEvents().getDataOut().get(0).getName());
@@ -172,7 +170,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(process.getOutputs().getOutputs().get(0).getName(),
                 coord.getOutputEvents().getDataOut().get(0).getDataset());
 
-        assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
+        assertEquals(5, coord.getDatasets().getDatasetOrAsyncDataset().size());
 
         ConfigurationStore store = ConfigurationStore.get();
         Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());