You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/10/05 19:43:18 UTC

[nifi] branch main updated: NIFI-9235 - Log conflicts between umask and ACL in PutHDFS

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d6805ab  NIFI-9235 - Log conflicts between umask and ACL in PutHDFS
d6805ab is described below

commit d6805abf9bc9d32c138f12fdc33e39da8e1c5fff
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Tue Sep 21 16:17:24 2021 -0400

    NIFI-9235 - Log conflicts between umask and ACL in PutHDFS
    
    This closes #5409
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-hdfs-processors/pom.xml                   |   5 +
 .../org/apache/nifi/processors/hadoop/PutHDFS.java |  49 +++++++++-
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 103 ++++++++++++++++++++-
 3 files changed, 154 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 3d8911d..cff199a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -79,6 +79,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.9.2</version>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
             <version>2.10.0</version>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 0a32930..462033d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -16,11 +16,15 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -35,6 +39,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -58,7 +64,9 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.security.PrivilegedAction;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -93,6 +101,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
     protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
     protected static final int BUFFER_SIZE_DEFAULT = 4096;
 
+    // state
+
+    private Cache<Path, AclStatus> aclCache;
+
     // relationships
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -152,7 +164,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .description(
                    "A umask represented as an octal number which determines the permissions of files written to HDFS. " +
                            "This overrides the Hadoop property \"fs.permissions.umask-mode\".  " +
-                           "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.")
+                           "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.  "+
+                           "If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.")
             .addValidator(HadoopValidators.UMASK_VALIDATOR)
             .build();
 
@@ -229,6 +242,19 @@ public class PutHDFS extends AbstractHadoopProcessor {
         FsPermission.setUMask(config, new FsPermission(dfsUmask));
     }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        aclCache = Caffeine.newBuilder()
+                .maximumSize(20L)
+                .expireAfterWrite(Duration.ofHours(1))
+                .build();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        aclCache.invalidateAll();
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final FlowFile flowFile = session.get();
@@ -254,7 +280,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 FlowFile putFlowFile = flowFile;
                 try {
                     final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
-
+                    checkAclStatus(getAclStatus(dirPath));
                     final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
                     final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
                     final int bufferSize = getBufferSize(context, session, putFlowFile);
@@ -423,6 +449,25 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                 return null;
             }
+
+            private void checkAclStatus(final AclStatus aclStatus) throws IOException {
+                final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(
+                        aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope()));
+                final boolean isSetUmask = context.getProperty(UMASK).isSet();
+                if (isDefaultACL && isSetUmask) {
+                    throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
+                }
+            }
+
+            private AclStatus getAclStatus(final Path dirPath) {
+                return aclCache.get(dirPath, fn -> {
+                    try {
+                        return hdfs.getAclStatus(dirPath);
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
+                    }
+                });
+            }
         });
     }
 
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 945e24e..abe288d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
@@ -43,6 +45,7 @@ import org.ietf.jgss.GSSException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import javax.security.sasl.SaslException;
 import java.io.ByteArrayOutputStream;
@@ -51,6 +54,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +66,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class PutHDFSTest {
 
@@ -468,6 +476,89 @@ public class PutHDFSTest {
             fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
     }
 
+    /**
+     * Multiple invocations of PutHDFS on the same target directory should query the remote filesystem ACL once, and
+     * use the cached ACL afterwards.
+     */
+    @Test
+    public void testPutHDFSAclCache() {
+        final MockFileSystem fileSystem = Mockito.spy(new MockFileSystem());
+        final Path directory = new Path("/withACL");
+        assertTrue(fileSystem.mkdirs(directory));
+        final String acl = "user::rwx,group::rwx,other::rwx";
+        final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
+        fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
+
+        final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+        runner.setProperty(PutHDFS.UMASK, "077");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "empty");
+        runner.enqueue(new byte[16], attributes);
+        runner.run(3);  // fetch data once; hit AclCache twice
+        verify(fileSystem, times(1)).getAclStatus(any(Path.class));
+    }
+
+    /**
+     * When no default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
+     * should be ok.
+     */
+    @Test
+    public void testPutFileWithNoDefaultACL() {
+        final List<Boolean> setUmask = Arrays.asList(false, true);
+        for (boolean setUmaskIt : setUmask) {
+            final MockFileSystem fileSystem = new MockFileSystem();
+            final Path directory = new Path("/withNoDACL");
+            assertTrue(fileSystem.mkdirs(directory));
+            final String acl = "user::rwx,group::rwx,other::rwx";
+            fileSystem.setAcl(directory, AclEntry.parseAclSpec(acl, true));
+
+            final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+            final TestRunner runner = TestRunners.newTestRunner(processor);
+            runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+            if (setUmaskIt) {
+                runner.setProperty(PutHDFS.UMASK, "077");
+            }
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "empty");
+            runner.enqueue(new byte[16], attributes);
+            runner.run();
+            assertEquals(1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
+            assertEquals(0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
+        }
+    }
+
+    /**
+     * When default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
+     * should trigger failure of the flow file.
+     */
+    @Test
+    public void testPutFileWithDefaultACL() {
+        final List<Boolean> setUmask = Arrays.asList(false, true);
+        for (boolean setUmaskIt : setUmask) {
+            final MockFileSystem fileSystem = new MockFileSystem();
+            final Path directory = new Path("/withACL");
+            assertTrue(fileSystem.mkdirs(directory));
+            final String acl = "user::rwx,group::rwx,other::rwx";
+            final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
+            fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
+
+            final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+            final TestRunner runner = TestRunners.newTestRunner(processor);
+            runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+            if (setUmaskIt) {
+                runner.setProperty(PutHDFS.UMASK, "077");
+            }
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "empty");
+            runner.enqueue(new byte[16], attributes);
+            runner.run();
+            assertEquals(setUmaskIt ? 0 : 1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
+            assertEquals(setUmaskIt ? 1 : 0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
+        }
+    }
+
     @Test
     public void testPutFileWithCloseException() throws IOException {
         mockFileSystem = new MockFileSystem(true);
@@ -522,8 +613,9 @@ public class PutHDFSTest {
         }
     }
 
-    private class MockFileSystem extends FileSystem {
+    private static class MockFileSystem extends FileSystem {
         private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+        private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
         private final boolean failOnClose;
 
         public MockFileSystem() {
@@ -534,6 +626,15 @@ public class PutHDFSTest {
             this.failOnClose = failOnClose;
         }
 
+        public void setAcl(final Path path, final List<AclEntry> aclSpec) {
+            pathToAcl.put(path, aclSpec);
+        }
+
+        @Override
+        public AclStatus getAclStatus(final Path path) {
+            return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build();
+        }
+
         @Override
         public URI getUri() {
             return URI.create("file:///");