You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 19:43:41 UTC

[2/9] git commit: FALCON-462 Add ACL for process entity. Contributed by Venkatesh Seetharam

FALCON-462 Add ACL for process entity. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/909888f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/909888f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/909888f9

Branch: refs/heads/master
Commit: 909888f9b6703ac4daffeac1605eb78408ccf586
Parents: b183060
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:13:05 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:13:05 2014 -0700

----------------------------------------------------------------------
 client/src/main/resources/jaxb-binding.xjb      |  4 +
 client/src/main/resources/process-0.1.xsd       | 15 ++++
 .../entity/parser/ProcessEntityParser.java      | 57 +++++++-----
 .../entity/parser/ProcessEntityParserTest.java  | 95 ++++++++++++++++++++
 .../resources/config/process/process-table.xml  |  3 +
 5 files changed, 154 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index e51ccb7..b7d5359 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -40,6 +40,10 @@
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>
 
+    <jaxb:bindings schemaLocation="process-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:globalBindings>
         <xjc:simple/>
     </jaxb:globalBindings>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 0d0f0e9..b1bd426 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -155,6 +155,7 @@
                     </xs:documentation>
                 </xs:annotation>
             </xs:element>
+            <xs:element type="ACL" name="ACL" minOccurs="0"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
     </xs:complexType>
@@ -352,4 +353,18 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
+
+    <xs:complexType name="ACL">
+        <xs:annotation>
+            <xs:documentation>
+                Access control list for this process.
+                owner is the Owner of this entity.
+                group is the one which has access to read - not used at this time.
+                permission is not enforced at this time
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="owner"/>
+        <xs:attribute type="xs:string" name="group"/>
+        <xs:attribute type="xs:string" name="permission"/>
+    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 837b86a..7da86a7 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -18,14 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.net.ConnectException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -35,6 +27,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.ACL;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.LateInput;
@@ -46,6 +39,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
 /**
  * Concrete Parser which has XML parsing and validation logic for Process XML.
  */
@@ -96,6 +97,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
         }
         validateDatasetName(process.getInputs(), process.getOutputs());
         validateLateInputs(process);
+        validateACL(process);
     }
 
     /**
@@ -106,8 +108,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
      * @throws FalconException
      */
     private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
-        org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
-                clusterName);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
 
         if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
@@ -115,7 +117,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
 
         String workflowPath = process.getWorkflow().getPath();
         String libPath = process.getWorkflow().getLib();
-        String nameNode = getNameNode(cluster, clusterName);
+        String nameNode = getNameNode(cluster);
         try {
             Configuration configuration = ClusterHelper.getConfiguration(cluster);
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
@@ -127,21 +129,16 @@ public class ProcessEntityParser extends EntityParser<Process> {
             if (libPath != null && !fs.exists(new Path(libPath))) {
                 throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode);
             }
-        } catch (ValidationException e) {
-            throw new ValidationException(e);
-        } catch (ConnectException e) {
-            throw new ValidationException(
-                    "Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName);
-        } catch (Exception e) {
-            throw new FalconException(e);
+        } catch (IOException e) {
+            throw new FalconException("Error validating workflow path " + workflowPath, e);
         }
     }
 
-    private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
+    private String getNameNode(Cluster cluster) throws ValidationException {
         // cluster should never be null as it is validated while submitting feeds.
         if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
             throw new ValidationException(
-                    "Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
+                    "Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
         }
         return ClusterHelper.getStorageUrl(cluster);
     }
@@ -226,4 +223,24 @@ public class ProcessEntityParser extends EntityParser<Process> {
                     + input.getName());
         }
     }
+
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param process process entity
+     * @throws ValidationException
+     */
+    private void validateACL(Process process) throws FalconException {
+        if (!isAuthorizationEnabled()) {
+            return;
+        }
+
+        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
+        ACL processACL = process.getACL();
+        if (processACL == null) {
+            throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
+        }
+
+        validateOwner(processACL.getOwner());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index cd6c713..93c34d2 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -28,6 +28,8 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -39,6 +41,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
@@ -330,4 +333,96 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         parser.validate(process);
         Assert.fail("An exception should have been thrown since Input partitions are not supported for table storage");
     }
+
+    @Test
+    public void testValidateACLWithNoACLAndAuthorizationDisabled() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+        Process process = parser.parse(stream);
+        Assert.assertNotNull(process);
+        Assert.assertNull(process.getACL());
+
+        parser.validate(process);
+    }
+
+    @Test
+    public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+        Process process = parser.parse(stream);
+        Assert.assertNotNull(process);
+        Assert.assertNotNull(process.getACL());
+        Assert.assertNotNull(process.getACL().getOwner());
+        Assert.assertNotNull(process.getACL().getGroup());
+        Assert.assertNotNull(process.getACL().getPermission());
+
+        parser.validate(process);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLWithNoACLAndAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("falcon");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+            Process process = parser.parse(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNull(process.getACL());
+
+            parser.validate(process);
+            Assert.fail("Validation exception should have been thrown for empty ACL");
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("falcon");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            Process process = parser.parseAndValidate(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAuthorizationEnabledBadOwner() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("blah");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            Process process = parser.parse(stream);
+
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+
+            parser.validate(process);
+            Assert.fail("Validation exception should have been thrown for invalid owner");
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/test/resources/config/process/process-table.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-table.xml b/common/src/test/resources/config/process/process-table.xml
index 9408973..bcaf28e 100644
--- a/common/src/test/resources/config/process/process-table.xml
+++ b/common/src/test/resources/config/process/process-table.xml
@@ -43,4 +43,7 @@
     <workflow engine="oozie" path="/falcon/test/workflow"/>
 
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+    <!-- ACL -->
+    <ACL owner="falcon" group="falcon" permission="*"/>
 </process>