You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/03/19 07:56:30 UTC
falcon git commit: FALCON-910 Better error messages when creating
cluster's directories. Contributed by karan kumar
Repository: falcon
Updated Branches:
refs/heads/master d4800401a -> bc3d23bd0
FALCON-910 Better error messages when creating cluster's directories. Contributed by karan kumar
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bc3d23bd
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bc3d23bd
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bc3d23bd
Branch: refs/heads/master
Commit: bc3d23bd05155092838273337ab6c84d1bcfd78f
Parents: d480040
Author: Suhas Vasu <su...@inmobi.com>
Authored: Thu Mar 19 12:26:13 2015 +0530
Committer: Suhas Vasu <su...@inmobi.com>
Committed: Thu Mar 19 12:26:13 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
client/src/main/resources/cluster-0.1.xsd | 14 +-
.../org/apache/falcon/entity/ClusterHelper.java | 37 ++++-
.../org/apache/falcon/entity/EntityUtil.java | 3 +-
.../entity/parser/ClusterEntityParser.java | 149 +++++++++++++------
.../entity/parser/ClusterEntityParserTest.java | 105 +++++++++++--
docs/src/site/twiki/EntitySpecification.twiki | 13 +-
.../falcon/regression/core/bundle/Bundle.java | 3 +-
.../falcon/regression/core/util/BundleUtil.java | 5 +-
.../apache/falcon/oozie/OozieBundleBuilder.java | 4 +-
.../OozieOrchestrationWorkflowBuilder.java | 3 +-
.../service/SharedLibraryHostingService.java | 5 +-
.../feed/OozieFeedWorkflowBuilderTest.java | 5 +-
.../falcon/oozie/process/AbstractTestBase.java | 9 +-
.../OozieProcessWorkflowBuilderTest.java | 4 +-
.../entity/filesystem/embedded-cluster.xml | 6 +-
.../entity/filesystem/standalone-cluster.xml | 6 +-
.../filesystem/standalone-target-cluster.xml | 6 +-
.../falcon/cluster/util/EmbeddedCluster.java | 5 +-
.../lifecycle/FileSystemFeedReplicationIT.java | 3 +-
.../TableStorageFeedReplicationIT.java | 3 +-
.../org/apache/falcon/process/PigProcessIT.java | 3 +-
.../falcon/process/TableStorageProcessIT.java | 3 +-
.../org/apache/falcon/resource/TestContext.java | 43 +++---
.../validation/ClusterEntityValidationIT.java | 40 +++--
25 files changed, 350 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b7a14f2..96cca70 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,9 @@ Trunk (Unreleased)
FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu)
IMPROVEMENTS
+ FALCON-910 Better error messages when creating cluster's directories
+ (karan kumar via Suhas Vasu)
+
FALCON-1042 Misleading mesage received while performing touch operation
on scheduled entity (Suhas Vasu)
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/client/src/main/resources/cluster-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd
index 6fd9de7..dd2c171 100644
--- a/client/src/main/resources/cluster-0.1.xsd
+++ b/client/src/main/resources/cluster-0.1.xsd
@@ -128,8 +128,9 @@
<xs:annotation>
<xs:documentation>
Location has the name and the path.
- name: is the type of locations like
- staging, temp and working.
+ name: is the type of locations which can be
+ staging, temp and working only.
+ staging is a mandatory type.
path: the hdfs path for each location.
Falcon would use the location to do intermediate
processing of entities in hdfs and hence Falcon
@@ -137,7 +138,7 @@
locations.
</xs:documentation>
</xs:annotation>
- <xs:attribute type="IDENTIFIER" name="name" use="required"/>
+ <xs:attribute type="cluster-location-type" name="name" use="required"/>
<xs:attribute type="xs:string" name="path" use="required"/>
</xs:complexType>
<xs:complexType name="interfaces">
@@ -200,4 +201,11 @@
<xs:attribute type="xs:string" name="group"/>
<xs:attribute type="xs:string" name="permission" default="*"/>
</xs:complexType>
+ <xs:simpleType name="cluster-location-type">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="staging"/>
+ <xs:enumeration value="working"/>
+ <xs:enumeration value="temp"/>
+ </xs:restriction>
+ </xs:simpleType>
</xs:schema>
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 6a363b6..49d408f 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
@@ -39,6 +40,7 @@ import java.util.Map;
*/
public final class ClusterHelper {
public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+ public static final String WORKINGDIR = "working";
private ClusterHelper() {
}
@@ -120,15 +122,44 @@ public final class ClusterHelper {
return normalizedPath.substring(0, normalizedPath.length() - 1);
}
- public static String getLocation(Cluster cluster, String locationKey) {
+ public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) {
for (Location loc : cluster.getLocations().getLocations()) {
- if (loc.getName().equals(locationKey)) {
- return loc.getPath();
+ if (loc.getName().equals(clusterLocationType)) {
+ return loc;
+ }
+ }
+ //Mocking the working location FALCON-910
+ if (clusterLocationType.equals(ClusterLocationType.WORKING)) {
+ Location staging = getLocation(cluster, ClusterLocationType.STAGING);
+ if (staging != null) {
+ Location working = new Location();
+ working.setName(ClusterLocationType.WORKING);
+ working.setPath(staging.getPath().charAt(staging.getPath().length() - 1) == '/'
+ ?
+ staging.getPath().concat(WORKINGDIR)
+ :
+ staging.getPath().concat("/").concat(WORKINGDIR));
+ return working;
}
}
return null;
}
+ /**
+ * Parsed the cluster object and checks for the working location.
+ *
+ * @param cluster
+ * @return
+ */
+ public static boolean checkWorkingLocationExists(Cluster cluster) {
+ for (Location loc : cluster.getLocations().getLocations()) {
+ if (loc.getName().equals(ClusterLocationType.WORKING)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static String getPropertyValue(Cluster cluster, String propName) {
if (cluster.getProperties() != null) {
for (Property prop : cluster.getProperties().getProperties()) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index c553d1e..3656ae4 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.*;
@@ -574,7 +575,7 @@ public final class EntityUtil {
//Each entity update creates a new staging path
//Base staging path is the base path for all staging dirs
public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
- return new Path(ClusterHelper.getLocation(cluster, "staging"),
+ return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(),
"falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 87b61a4..2af8c16 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
@@ -154,8 +155,8 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
protected void validateMessagingInterface(Cluster cluster) throws ValidationException {
final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
- final String implementation = StartupProperties.get().getProperty(
- "broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory");
+ final String implementation = StartupProperties.get().getProperty("broker.impl.class",
+ "org.apache.activemq.ActiveMQConnectionFactory");
LOG.info("Validating messaging interface: {}, implementation: {}", messagingUrl, implementation);
try {
@@ -243,58 +244,120 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
try {
fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
} catch (FalconException e) {
- throw new ValidationException(
- "Unable to get file system handle for cluster " + cluster.getName(), e);
+ throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e);
}
- for (Location location : cluster.getLocations().getLocations()) {
- final String locationName = location.getName();
- if (locationName.equals("temp")) {
- continue;
- }
+ Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING);
- try {
- checkPathOwnerAndPermission(cluster.getName(), location.getPath(), fs,
- "staging".equals(locationName)
- ? HadoopClientFactory.ALL_PERMISSION
- : HadoopClientFactory.READ_EXECUTE_PERMISSION);
- } catch (IOException e) {
- throw new ValidationException("Unable to validate the location of name: "
- + location.getName() + " with path:" + location.getPath()
- + " for cluster " + cluster.getName(), e);
+ if (stagingLocation == null) {
+ throw new ValidationException(
+ "Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value()
+ + " for cluster " + cluster.getName());
+ } else {
+
+ checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs,
+ HadoopClientFactory.READ_EXECUTE_PERMISSION, false);
+
+ if (!ClusterHelper.checkWorkingLocationExists(cluster)) {
+ //Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910
+
+ Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR);
+ try {
+ if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created
+ HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+ } else {
+ if (fs.isDirectory(workingDirPath)) {
+ FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission();
+ if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check
+ throw new ValidationException(
+ "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
+ + stagingLocation.getPath()
+ + " when staging location not specified with "
+ + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got "
+ + workingPerms.toString());
+ }
+ } else {
+ throw new ValidationException(
+ "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
+ + stagingLocation.getPath()
+ + " when staging location not specified. Got a file at " + workingDirPath
+ .toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new ValidationException(
+ "Unable to create path for " + workingDirPath.toString() + " with path: "
+ + workingDirPath.toString() + " for cluster " + cluster.getName(), e);
+ }
+ } else {
+ Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING);
+ if (stagingLocation.getPath().equals(workingLocation.getPath())) {
+ throw new ValidationException(
+ "Location with name: " + stagingLocation.getName().value() + " and " + workingLocation
+ .getName().value() + " cannot of same path: " + stagingLocation.getPath()
+ + " for cluster :" + cluster.getName());
+ } else {
+
+ checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs,
+ HadoopClientFactory.READ_EXECUTE_PERMISSION, true);
+
+ }
}
+
}
+
}
private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs,
- FsPermission expectedPermission)
- throws IOException, ValidationException {
+ FsPermission expectedPermission, Boolean exactPerms) throws ValidationException {
Path locationPath = new Path(location);
- if (!fs.exists(locationPath)) {
- throw new ValidationException("Location " + location
- + " for cluster " + clusterName + " must exist.");
- }
-
- // falcon owns this path on each cluster
- final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
- FileStatus fileStatus = fs.getFileStatus(locationPath);
- final String locationOwner = fileStatus.getOwner();
- if (!locationOwner.equals(loginUser)) {
- LOG.error("Location {} has owner {}, should be the process user {}",
- locationPath, locationOwner, loginUser);
- throw new ValidationException("Path [" + locationPath + "] has owner [" + locationOwner
- + "], should be the process user " + loginUser);
- }
+ try {
+ if (!fs.exists(locationPath)) {
+ throw new ValidationException("Location " + location + " for cluster " + clusterName + " must exist.");
+ }
- if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
- LOG.error("Location {} has permissions {}, should be {}",
- locationPath, fileStatus.getPermission(), expectedPermission);
- throw new ValidationException("Path " + locationPath + " has permissions: "
- + fileStatus.getPermission() + ", should be " + expectedPermission);
+ // falcon owns this path on each cluster
+ final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
+ FileStatus fileStatus = fs.getFileStatus(locationPath);
+ final String locationOwner = fileStatus.getOwner();
+ if (!locationOwner.equals(loginUser)) {
+ LOG.error("Location {} has owner {}, should be the process user {}", locationPath, locationOwner,
+ loginUser);
+ throw new ValidationException(
+ "Path [" + locationPath + "] has owner [" + locationOwner + "], should be the process user "
+ + loginUser);
+ }
+ String errorMessage = "Path " + locationPath + " has permissions: " + fileStatus.getPermission().toString()
+ + ", should be " + expectedPermission;
+ if (exactPerms) {
+ if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
+ LOG.error(errorMessage);
+ throw new ValidationException(errorMessage);
+ }
+ } else {
+ if (expectedPermission.getUserAction().ordinal() > fileStatus.getPermission().getUserAction()
+ .ordinal()) {
+ LOG.error(errorMessage + " at least");
+ throw new ValidationException(errorMessage + " at least");
+ }
+ if (expectedPermission.getGroupAction().ordinal() > fileStatus.getPermission().getGroupAction()
+ .ordinal()) {
+ LOG.error(errorMessage + " at least");
+ throw new ValidationException(errorMessage + " at least");
+ }
+ if (expectedPermission.getOtherAction().ordinal() > fileStatus.getPermission().getOtherAction()
+ .ordinal()) {
+ LOG.error(errorMessage + " at least");
+ throw new ValidationException(errorMessage + " at least");
+ }
+ }
+ // try to list to see if the user is able to write to this folder
+ fs.listStatus(locationPath);
+ } catch (IOException e) {
+ throw new ValidationException(
+ "Unable to validate the location with path: " + location + " for cluster:" + clusterName
+ + " due to transient failures ", e);
}
-
- // try to list to see if the user is able to write to this folder
- fs.listStatus(locationPath);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index a807e80..5085b24 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -25,12 +25,15 @@ import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Locations;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -80,7 +83,8 @@ public class ClusterEntityParserTest extends AbstractTestBase {
Assert.assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/");
Assert.assertEquals(workflow.getVersion(), "4.0");
- Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging");
+ Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(),
+ "/projects/falcon/staging");
StringWriter stringWriter = new StringWriter();
Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
@@ -91,7 +95,8 @@ public class ClusterEntityParserTest extends AbstractTestBase {
Assert.assertEquals(catalog.getEndpoint(), "http://localhost:48080/templeton/v1");
Assert.assertEquals(catalog.getVersion(), "0.11.0");
- Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging");
+ Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(),
+ "/projects/falcon/staging");
}
@Test
@@ -195,19 +200,24 @@ public class ClusterEntityParserTest extends AbstractTestBase {
}
/**
- * A lightweight unit test for a cluster where location paths are not instantiated .
+ * A lightweight unit test for a cluster where location type staging is missing.
* Extensive tests are found in ClusterEntityValidationIT.
*
* @throws ValidationException
*/
- @Test(expectedExceptions = ValidationException.class)
- public void testClusterWithoutLocationsPaths() throws ValidationException {
- ClusterEntityParser clusterEntityParser = Mockito.spy(
- (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
- Cluster cluster = this.dfsCluster.getCluster();
+ @Test(expectedExceptions = ValidationException.class) public void testClusterWithoutStaging() throws Exception {
+ ClusterEntityParser clusterEntityParser = Mockito
+ .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
+ Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy();
Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
+ Location location = new Location();
+ location.setName(ClusterLocationType.WORKING);
+ location.setPath("/apps/non/existent/path");
+ Locations locations = new Locations();
+ locations.getLocations().add(location);
+ cluster.setLocations(locations);
clusterEntityParser.validate(cluster);
Assert.fail("Should have thrown a validation exception");
}
@@ -219,12 +229,12 @@ public class ClusterEntityParserTest extends AbstractTestBase {
* @throws ValidationException
*/
@Test(expectedExceptions = ValidationException.class)
- public void testClusterWithInvalidLocationsPaths() throws ValidationException {
- ClusterEntityParser clusterEntityParser = Mockito.spy(
- (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
- Cluster cluster = this.dfsCluster.getCluster();
+ public void testClusterWithInvalidLocationsPaths() throws Exception {
+ ClusterEntityParser clusterEntityParser = Mockito
+ .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
+ Cluster cluster = (Cluster)this.dfsCluster.getCluster().copy();
Location location = new Location();
- location.setName("TestName");
+ location.setName(ClusterLocationType.STAGING);
location.setPath("/apps/non/existent/path");
Locations locations = new Locations();
locations.getLocations().add(location);
@@ -235,14 +245,79 @@ public class ClusterEntityParserTest extends AbstractTestBase {
try {
clusterEntityParser.validate(cluster);
} catch (ValidationException e) {
- String errorMessage = "Location " + location.getPath() + " for cluster "
- + cluster.getName() + " must exist.";
+ String errorMessage =
+ "Location " + location.getPath() + " for cluster " + cluster.getName() + " must exist.";
Assert.assertEquals(e.getMessage(), errorMessage);
throw e;
}
Assert.fail("Should have thrown a validation exception");
}
+ /**
+ * A lightweight unit test for a cluster where location paths are same.
+ * Extensive tests are found in ClusterEntityValidationIT.
+ *
+ * @throws ValidationException
+ */
+ @Test(expectedExceptions = ValidationException.class)
+ public void testClusterWithSameWorkingAndStaging() throws Exception {
+ ClusterEntityParser clusterEntityParser = Mockito
+ .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
+ Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy();
+ cluster.getLocations().getLocations().get(1).setPath("/projects/falcon/staging");
+ Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
+ clusterEntityParser.validate(cluster);
+ Assert.fail("Should have thrown a validation exception");
+ }
+
+ /**
+ * A lightweight unit test for a cluster where location type working is missing.
+ * It should automatically get generated
+ * Extensive tests are found in ClusterEntityValidationIT.
+ */
+ @Test public void testClusterWithOnlyStaging() throws Exception {
+ ClusterEntityParser clusterEntityParser = Mockito
+ .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
+ Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy();
+ cluster.getLocations().getLocations().remove(1);
+ Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
+ clusterEntityParser.validate(cluster);
+ String workingDirPath = cluster.getLocations().getLocations().get(0).getPath() + "/working";
+ Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), workingDirPath);
+ FileStatus workingDirStatus = this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(workingDirPath));
+ Assert.assertTrue(workingDirStatus.isDirectory());
+ Assert.assertEquals(workingDirStatus.getPermission(), HadoopClientFactory.READ_EXECUTE_PERMISSION);
+
+ }
+
+ /**
+ * A lightweight unit test for a cluster where location working is not there and staging
+ * has a subdir which will be used by cluster as working.
+ * Checking for wrong perms of this subdir
+ * Extensive tests are found in ClusterEntityValidationIT.
+ *
+ * @throws ValidationException
+ */
+ @Test(expectedExceptions = ValidationException.class)
+ public void testClusterWithSubdirInStaging() throws Exception {
+ ClusterEntityParser clusterEntityParser = Mockito
+ .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER));
+ Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy();
+ cluster.getLocations().getLocations().get(1).setPath("/projects/falcon/staging");
+ cluster.getLocations().getLocations().remove(1);
+ HadoopClientFactory.mkdirs(this.dfsCluster.getFileSystem(),
+ new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()),
+ HadoopClientFactory.ALL_PERMISSION);
+ Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
+ Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
+ clusterEntityParser.validate(cluster);
+ Assert.fail("Should have thrown a validation exception");
+ }
@BeforeClass
public void init() throws Exception {
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 66fcd2f..86df6d4 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -63,16 +63,17 @@ A messaging interface specifies the interface for sending feed availability mess
A cluster has a list of locations defined:
<verbatim>
<location name="staging" path="/projects/falcon/staging" />
-<location name="working" path="/projects/falcon/working" />
+<location name="working" path="/projects/falcon/working" /> <!--optional-->
</verbatim>
-Location has the name and the path, name is the type of locations like staging, temp and working.
-and path is the hdfs path for each location.
+Location has the name and the path, name is the type of locations .Allowed values of name are staging, temp and working.
+Path is the hdfs path for each location.
Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon
should have read/write/execute permission on these locations.
These locations MUST be created prior to submitting a cluster entity to Falcon.
-*staging* must have 777 permissions and the parent dirs must have execute permissions so multiple
-users can write to this location
-*working* must have 755 permissions and the parent dirs must have execute permissions so multiple
+*staging* should have atleast 755 permissions and is a mandatory location .The parent dirs must have execute permissions so multiple
+users can write to this location. *working* must have 755 permissions and is a optional location.
+If *working* is not specified, falcon creates a sub directory in the *staging* location with 755 perms.
+The parent dir for *working* must have execute permissions so multiple
users can read from this location
---+++ ACL
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 0dfd895..876ffa4 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -21,6 +21,7 @@ package org.apache.falcon.regression.core.bundle;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfaces;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -685,7 +686,7 @@ public class Bundle {
public void setCLusterWorkingPath(String clusterData, String path) {
ClusterMerlin c = new ClusterMerlin(clusterData);
for (int i = 0; i < c.getLocations().getLocations().size(); i++) {
- if (c.getLocations().getLocations().get(i).getName().contains("working")) {
+ if (c.getLocations().getLocations().get(i).getName().equals(ClusterLocationType.WORKING)) {
c.getLocations().getLocations().get(i).setPath(path);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 4218815..c53d927 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -21,6 +21,7 @@ package org.apache.falcon.regression.core.util;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.regression.Entities.ClusterMerlin;
@@ -115,11 +116,11 @@ public final class BundleUtil {
//set staging and working locations
clusterMerlin.getLocations().getLocations().clear();
final Location staging = new Location();
- staging.setName("staging");
+ staging.setName(ClusterLocationType.STAGING);
staging.setPath(MerlinConstants.STAGING_LOCATION);
clusterMerlin.getLocations().getLocations().add(staging);
final Location working = new Location();
- working.setName("working");
+ working.setName(ClusterLocationType.WORKING);
working.setPath(MerlinConstants.WORKING_LOCATION);
clusterMerlin.getLocations().getLocations().add(working);
final String protectionPropName = "hadoop.rpc.protection";
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index de11df6..03063f4 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.CONFIGURATION;
@@ -122,7 +123,8 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
- properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
+ properties.setProperty("falcon.libpath",
+ ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib");
if (EntityUtil.isTableStorageType(cluster, entity)) {
Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 3186c4a..49f9e07 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -209,7 +210,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
ClusterHelper.getConfiguration(cluster));
try {
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index 9567c5f..ba87a62 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.StartupProperties;
@@ -69,8 +70,8 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
};
private void addLibsTo(Cluster cluster) throws FalconException {
- Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), "lib");
- Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), "libext");
+ Path lib = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "lib");
+ Path libext = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "libext");
try {
FileSystem fs = HadoopClientFactory.get().createFalconFileSystem(
ClusterHelper.getConfiguration(cluster));
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 723f909..48449d4 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -714,14 +715,14 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
}
private void verifyClusterLocationsUMask(Cluster aCluster, FileSystem fs) throws IOException {
- String stagingLocation = ClusterHelper.getLocation(aCluster, "staging");
+ String stagingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
if (fs.exists(stagingPath)) {
FileStatus fileStatus = fs.getFileStatus(stagingPath);
Assert.assertEquals(fileStatus.getPermission().toShort(), 511);
}
- String workingLocation = ClusterHelper.getLocation(aCluster, "working");
+ String workingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.WORKING).getPath();
Path workingPath = new Path(workingLocation);
if (fs.exists(workingPath)) {
FileStatus fileStatus = fs.getFileStatus(workingPath);
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b549cfb..6488682 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
@@ -75,10 +76,10 @@ public class AbstractTestBase {
if (writeEndpoint != null) {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
- fs.create(
- new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
- fs.create(
- new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+ fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
+ "libext/FEED/retention/ext.jar")).close();
+ fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
+ "libext/FEED/replication/ext.jar")).close();
}
return cluster;
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index ef21f4d..545beb1 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
@@ -123,7 +124,8 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
- fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
+ fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
+ "libext/PROCESS/ext.jar")).close();
Process process = store.get(EntityType.PROCESS, "clicksummary");
Path wfpath = new Path(process.getWorkflow().getPath());
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/embedded-cluster.xml
----------------------------------------------------------------------
diff --git a/src/main/examples/entity/filesystem/embedded-cluster.xml b/src/main/examples/entity/filesystem/embedded-cluster.xml
index effcbd5..c505066 100644
--- a/src/main/examples/entity/filesystem/embedded-cluster.xml
+++ b/src/main/examples/entity/filesystem/embedded-cluster.xml
@@ -42,9 +42,9 @@
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/>
</interfaces>
<locations>
- <location name="staging" path="/projects/falcon/staging"/>
- <location name="temp" path="/projects/falcon/tmp"/>
- <location name="working" path="/projects/falcon/working"/>
+ <location name="staging" path="/projects/falcon/staging"/> <!--mandatory-->
+ <location name="temp" path="/projects/falcon/tmp"/> <!--optional-->
+ <location name="working" path="/projects/falcon/working"/> <!--optional-->
</locations>
<properties>
</properties>
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/standalone-cluster.xml
----------------------------------------------------------------------
diff --git a/src/main/examples/entity/filesystem/standalone-cluster.xml b/src/main/examples/entity/filesystem/standalone-cluster.xml
index 6fe4df3..4f8a5fc 100644
--- a/src/main/examples/entity/filesystem/standalone-cluster.xml
+++ b/src/main/examples/entity/filesystem/standalone-cluster.xml
@@ -34,9 +34,9 @@
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/>
</interfaces>
<locations>
- <location name="staging" path="/projects/falcon/staging"/>
- <location name="temp" path="/projects/falcon/tmp"/>
- <location name="working" path="/projects/falcon/working"/>
+ <location name="staging" path="/projects/falcon/staging"/> <!--mandatory-->
+ <location name="temp" path="/projects/falcon/tmp"/> <!--optional-->
+ <location name="working" path="/projects/falcon/working"/> <!--optional-->
</locations>
<properties>
</properties>
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/src/main/examples/entity/filesystem/standalone-target-cluster.xml
----------------------------------------------------------------------
diff --git a/src/main/examples/entity/filesystem/standalone-target-cluster.xml b/src/main/examples/entity/filesystem/standalone-target-cluster.xml
index 442449d..cb5c4c6 100644
--- a/src/main/examples/entity/filesystem/standalone-target-cluster.xml
+++ b/src/main/examples/entity/filesystem/standalone-target-cluster.xml
@@ -34,9 +34,9 @@
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3"/>
</interfaces>
<locations>
- <location name="staging" path="/projects/falcon/staging-target"/>
- <location name="temp" path="/projects/falcon/tmp-target"/>
- <location name="working" path="/projects/falcon/working-target"/>
+ <location name="staging" path="/projects/falcon/staging-target"/> <!--mandatory-->
+ <location name="temp" path="/projects/falcon/tmp-target"/> <!--optional-->
+ <location name="working" path="/projects/falcon/working-target"/> <!--optional-->
</locations>
<properties>
</properties>
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index c67b8fc..f410b21 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -19,6 +19,7 @@
package org.apache.falcon.cluster.util;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfaces;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -121,12 +122,12 @@ public class EmbeddedCluster {
clusterEntity.setInterfaces(interfaces);
Location location = new Location();
- location.setName("staging");
+ location.setName(ClusterLocationType.STAGING);
location.setPath("/projects/falcon/staging");
Locations locs = new Locations();
locs.getLocations().add(location);
location = new Location();
- location.setName("working");
+ location.setName(ClusterLocationType.WORKING);
location.setPath("/projects/falcon/working");
locs.getLocations().add(location);
clusterEntity.setLocations(locs);
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
index 9383026..d7f36da 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.lifecycle;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.TestContext;
@@ -96,7 +97,7 @@ public class FileSystemFeedReplicationIT {
private void copyLibsToHDFS(Cluster cluster) throws IOException {
// set up kahadb to be sent as part of workflows
StartupProperties.get().setProperty("libext.paths", "./target/libext");
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
index 55610b7..13bc480 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.lifecycle;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
@@ -112,7 +113,7 @@ public class TableStorageFeedReplicationIT {
private void copyLibsToHDFS(Cluster cluster) throws IOException {
// set up kahadb to be sent as part of workflows
StartupProperties.get().setProperty("libext.paths", "./target/libext");
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index edbe32a..575b870 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.process;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.TestContext;
@@ -80,7 +81,7 @@ public class PigProcessIT {
private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
// set up kahadb to be sent as part of workflows
StartupProperties.get().setProperty("libext.paths", "./target/libext");
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
index 9cb6850..2e882ea 100644
--- a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.process;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
@@ -100,7 +101,7 @@ public class TableStorageProcessIT {
private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
// set up kahadb to be sent as part of workflows
StartupProperties.get().setProperty("libext.paths", "./target/libext");
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 593079a..7b227b3 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -38,6 +38,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
@@ -315,34 +316,39 @@ public class TestContext {
return submitFileToFalcon(entityType, tmpFile);
}
- public static void deleteClusterLocations(Cluster clusterEntity,
- FileSystem fs) throws IOException {
- String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+ public static void deleteClusterLocations(Cluster clusterEntity, FileSystem fs) throws IOException {
+ String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
if (fs.exists(stagingPath)) {
fs.delete(stagingPath, true);
}
- String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+ String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath();
Path workingPath = new Path(workingLocation);
if (fs.exists(workingPath)) {
fs.delete(workingPath, true);
}
}
- public static void createClusterLocations(Cluster clusterEntity,
- FileSystem fs) throws IOException {
- String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+ public static void createClusterLocations(Cluster clusterEntity, FileSystem fs) throws IOException {
+ createClusterLocations(clusterEntity, fs, true);
+ }
+
+ public static void createClusterLocations(Cluster clusterEntity, FileSystem fs, boolean withWorking)
+ throws IOException {
+ String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
- if (!fs.exists(stagingPath)) {
- HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION);
+ if (fs.exists(stagingPath)) {
+ fs.delete(stagingPath, true);
}
-
- String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
- Path workingPath = new Path(workingLocation);
- if (!fs.exists(workingPath)) {
- HadoopClientFactory
- .mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+ HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION);
+ if (withWorking) {
+ String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath();
+ Path workingPath = new Path(workingLocation);
+ if (fs.exists(workingPath)) {
+ fs.delete(workingPath, true);
+ }
+ HadoopClientFactory.mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
}
}
@@ -503,12 +509,11 @@ public class TestContext {
initClusterLocations(cluster, fs);
}
- private static void initClusterLocations(EmbeddedCluster cluster,
- FileSystem fs) throws Exception {
- String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), "staging");
+ private static void initClusterLocations(EmbeddedCluster cluster, FileSystem fs) throws Exception {
+ String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), ClusterLocationType.STAGING).getPath();
mkdir(fs, new Path(stagingPath), HadoopClientFactory.ALL_PERMISSION);
- String workingPath = ClusterHelper.getLocation(cluster.getCluster(), "working");
+ String workingPath = ClusterHelper.getLocation(cluster.getCluster(), ClusterLocationType.WORKING).getPath();
mkdir(fs, new Path(workingPath), HadoopClientFactory.READ_EXECUTE_PERMISSION);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bc3d23bd/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
index cbbf90a..431d334 100644
--- a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
@@ -26,8 +26,10 @@ import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.TestContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -156,32 +158,48 @@ public class ClusterEntityValidationIT {
parser.validate(cluster);
}
- @Test (expectedExceptions = ValidationException.class)
+ @Test
+ public void testValidateClusterLocationsWithoutWorking() throws Exception {
+ overlay = context.getUniqueOverlay();
+ String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ InputStream stream = new FileInputStream(filePath);
+ Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+ clusterEntity.getLocations().getLocations().remove(2);
+ FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ TestContext.createClusterLocations(clusterEntity, clusterFileSystem, false);
+ parser.validate(clusterEntity);
+ String expectedPath =
+ ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath() + "/working";
+ Assert.assertEquals(ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath(),
+ expectedPath);
+ Assert.assertTrue(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).isDirectory());
+ Assert.assertEquals(clusterFileSystem.getFileLinkStatus(new Path(expectedPath)).getPermission(),
+ HadoopClientFactory.READ_EXECUTE_PERMISSION);
+ }
+
+ @Test(expectedExceptions = ValidationException.class)
public void testValidateClusterLocationsThatDontExist() throws Exception {
TestContext.deleteClusterLocations(cluster, fs);
parser.validate(cluster);
Assert.fail("Should have thrown a validation exception");
}
- @Test (expectedExceptions = ValidationException.class)
+ @Test(expectedExceptions = ValidationException.class)
public void testValidateClusterLocationsThatExistWithBadOwner() throws Exception {
- TestContext.deleteClusterLocations(cluster, fs);
createClusterLocationsBadPermissions(cluster);
parser.validate(cluster);
Assert.fail("Should have thrown a validation exception");
}
private void createClusterLocationsBadPermissions(Cluster clusterEntity) throws IOException {
- String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+ FileSystem clusterFileSystem = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity));
+ TestContext.deleteClusterLocations(clusterEntity, clusterFileSystem);
+ String stagingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
- if (!fs.exists(stagingPath)) {
- FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
- }
+ FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION);
- String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+ String workingLocation = ClusterHelper.getLocation(clusterEntity, ClusterLocationType.WORKING).getPath();
Path workingPath = new Path(workingLocation);
- if (!fs.exists(workingPath)) {
- FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
- }
+ FileSystem.mkdirs(clusterFileSystem, stagingPath, OWNER_ONLY_PERMISSION);
}
}