You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/09/10 08:43:36 UTC
git commit: FALCON-677 Feed data and stats path defaults to /tmp/.
Contributed by Suhas Vasu
Repository: incubator-falcon
Updated Branches:
refs/heads/master 2e3eebdff -> 4bdfac484
FALCON-677 Feed data and stats path defaults to /tmp/. Contributed by Suhas Vasu
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/4bdfac48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/4bdfac48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/4bdfac48
Branch: refs/heads/master
Commit: 4bdfac48434b48bd410bc8ac8c60617032be14a0
Parents: 2e3eebd
Author: Shwetha GS <sh...@inmobi.com>
Authored: Wed Sep 10 12:13:25 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Wed Sep 10 12:13:25 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../org/apache/falcon/entity/FeedHelper.java | 2 +-
.../apache/falcon/entity/FileSystemStorage.java | 32 +++++++++++---------
.../falcon/entity/parser/FeedEntityParser.java | 32 +++++++++++++++++---
.../falcon/entity/FileSystemStorageTest.java | 2 +-
.../ProcessExecutionCoordinatorBuilder.java | 12 ++++++++
.../OozieProcessWorkflowBuilderTest.java | 4 +--
7 files changed, 61 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c98113..483dee2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-677 Feed data and stats path defaults to /tmp/. (Suhas Vasu via Shwetha GS)
+
FALCON-590 Update to ACLs added to process is not handled
(Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 323188d..4174135 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -194,7 +194,7 @@ public final class FeedHelper {
return getStorageType(feed, feedCluster);
}
- protected static List<Location> getLocations(Cluster cluster, Feed feed) {
+ public static List<Location> getLocations(Cluster cluster, Feed feed) {
// check if locations are overridden in cluster
final Locations clusterLocations = cluster.getLocations();
if (clusterLocations != null
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 4eb3d60..58506ad 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -18,6 +18,7 @@
package org.apache.falcon.entity;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.AccessControlList;
@@ -167,8 +168,12 @@ public class FileSystemStorage implements Storage {
@Override
public String getUriTemplate(LocationType locationType) {
+ return getUriTemplate(locationType, locations);
+ }
+
+ public String getUriTemplate(LocationType locationType, List<Location> locationList) {
Location locationForType = null;
- for (Location location : locations) {
+ for (Location location : locationList) {
if (location.getType() == locationType) {
locationForType = location;
break;
@@ -176,7 +181,7 @@ public class FileSystemStorage implements Storage {
}
if (locationForType == null) {
- return "/tmp";
+ return null;
}
// normalize the path so trailing and double '/' are removed
@@ -216,27 +221,24 @@ public class FileSystemStorage implements Storage {
final List<Location> fsStorageLocations = fsStorage.getLocations();
return getLocations().size() == fsStorageLocations.size()
- && getLocation(getLocations(), LocationType.DATA).getPath().equals(
- getLocation(fsStorageLocations, LocationType.DATA).getPath())
- && getLocation(getLocations(), LocationType.META).getPath().equals(
- getLocation(fsStorageLocations, LocationType.META).getPath())
- && getLocation(getLocations(), LocationType.STATS).getPath().equals(
- getLocation(fsStorageLocations, LocationType.STATS).getPath())
- && getLocation(getLocations(), LocationType.TMP).getPath().equals(
- getLocation(fsStorageLocations, LocationType.TMP).getPath());
+ && StringUtils.equals(getUriTemplate(LocationType.DATA, getLocations()),
+ getUriTemplate(LocationType.DATA, fsStorageLocations))
+ && StringUtils.equals(getUriTemplate(LocationType.STATS, getLocations()),
+ getUriTemplate(LocationType.STATS, fsStorageLocations))
+ && StringUtils.equals(getUriTemplate(LocationType.META, getLocations()),
+ getUriTemplate(LocationType.META, fsStorageLocations))
+ && StringUtils.equals(getUriTemplate(LocationType.TMP, getLocations()),
+ getUriTemplate(LocationType.TMP, fsStorageLocations));
}
- private static Location getLocation(List<Location> locations, LocationType type) {
+ public static Location getLocation(List<Location> locations, LocationType type) {
for (Location loc : locations) {
if (loc.getType() == type) {
return loc;
}
}
- Location loc = new Location();
- loc.setPath("/tmp");
- loc.setType(type);
- return loc;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 8fd56e1..34b764b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -21,11 +21,7 @@ package org.apache.falcon.entity.parser;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.*;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
@@ -35,6 +31,7 @@ import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -50,6 +47,7 @@ import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
+import java.util.List;
/**
* Parser that parses feed entity definition.
@@ -81,6 +79,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
validateFeedStorage(feed);
+ validateFeedPath(feed);
validateFeedPartitionExpression(feed);
validateFeedGroups(feed);
validateACL(feed);
@@ -432,4 +431,27 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
}
}
+
+ /**
+ * Validate if FileSystem based feed contains location type data.
+ *
+ * @param feed Feed entity
+ * @throws FalconException
+ */
+ private void validateFeedPath(Feed feed) throws FalconException {
+ if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+ return;
+ }
+
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ List<Location> locations = FeedHelper.getLocations(cluster, feed);
+ Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
+
+ if (dataLocation == null) {
+ throw new ValidationException(feed.getName() + " is a FileSystem based feed "
+ + "but it doesn't contain location type - data in cluster " + cluster.getName().toString());
+ }
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index a78c678..4bb7772 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -70,7 +70,7 @@ public class FileSystemStorageTest {
+ "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
FileSystemStorage storage = new FileSystemStorage(feedBasePath);
- Assert.assertEquals(storage.getUriTemplate(), feedBasePath + "#TMP=/tmp");
+ Assert.assertEquals(storage.getUriTemplate(), feedBasePath);
Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 1fa6758..7a87919 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -175,6 +175,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
}
SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+ if (syncdataset == null) {
+ return;
+ }
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAIN datain = createDataIn(input);
@@ -218,6 +221,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
String uriTemplate = storage.getUriTemplate(locationType);
+ if (uriTemplate == null) {
+ return null;
+ }
if (storage.getType() == Storage.TYPE.TABLE) {
uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
}
@@ -274,6 +280,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
Storage storage = FeedHelper.createStorage(cluster, feed);
SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+ if (syncdataset == null) {
+ return;
+ }
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAOUT dataout = createDataOut(output);
@@ -320,6 +329,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
String type = locType.name().toLowerCase();
SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+ if (dataset == null) {
+ return;
+ }
coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
DATAOUT dataout = new DATAOUT();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4bdfac48/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 23c01a9..d6f9b54 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -162,8 +162,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
coord.getOutputEvents().getDataOut().get(1).getName());
assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
coord.getOutputEvents().getDataOut().get(2).getName());
- assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
- coord.getOutputEvents().getDataOut().get(3).getName());
assertEquals(process.getOutputs().getOutputs().get(0).getName(),
coord.getOutputEvents().getDataOut().get(0).getName());
@@ -172,7 +170,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
assertEquals(process.getOutputs().getOutputs().get(0).getName(),
coord.getOutputEvents().getDataOut().get(0).getDataset());
- assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
+ assertEquals(5, coord.getDatasets().getDatasetOrAsyncDataset().size());
ConfigurationStore store = ConfigurationStore.get();
Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());