You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/22 07:08:41 UTC

[GitHub] [nifi] simonbence opened a new pull request #5175: NIFI-8717 Refactoring PutHDFS

simonbence opened a new pull request #5175:
URL: https://github.com/apache/nifi/pull/5175


   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   [NIFI-8717](https://issues.apache.org/jira/browse/NIFI-8717)
   
   This is a small proposal in order to decouple NiFi related and HDFS related logic. This will make the implementation more flexible in the long run.
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #5175: NIFI-8717 Refactoring PutHDFS

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #5175:
URL: https://github.com/apache/nifi/pull/5175#issuecomment-866174949


   Thanks @simonbence the changes seem reasonable to me. +1 merged to main!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 merged pull request #5175: NIFI-8717 Refactoring PutHDFS

Posted by GitBox <gi...@apache.org>.
markap14 merged pull request #5175:
URL: https://github.com/apache/nifi/pull/5175


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5175: NIFI-8717 Refactoring PutHDFS

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5175:
URL: https://github.com/apache/nifi/pull/5175#discussion_r656283356



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
+    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+    protected static final int BUFFER_SIZE_DEFAULT = 4096;
+
+    protected static final String REPLACE_RESOLUTION = "replace";
+    protected static final String IGNORE_RESOLUTION = "ignore";
+    protected static final String FAIL_RESOLUTION = "fail";
+    protected static final String APPEND_RESOLUTION = "append";
+
+    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Ignores the flow file and routes it to success.");
+    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Penalizes the flow file and routes it to failure.");
+    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+            "Appends to the existing file if any, creates a new file otherwise.");
+
+    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
+            .build();
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || hdfs == null || ugi == null) {
+            getLogger().error("HDFS not configured properly");
+            session.transfer(flowFile, getFailureRelationship());
+            context.yield();
+            return;
+        }
+
+        ugi.doAs(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                Path tempDotCopyFile = null;
+                FlowFile putFlowFile = flowFile;
+                try {
+                    final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+                    final Path configuredRootDirPath = new Path(dirValue);
+
+                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+                    final long blockSize = getBlockSize(context, session, putFlowFile);
+                    final int bufferSize = getBufferSize(context, session, putFlowFile);
+                    final short replication = getReplication(context, session, putFlowFile);
+
+                    final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+                    final String filename = codec != null
+                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
+                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                    final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
+                    final Path copyFile = new Path(configuredRootDirPath, filename);
+
+                    // Create destination directory if it does not exist
+                    try {
+                        if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
+                            throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
+                        }
+                    } catch (FileNotFoundException fe) {
+                        if (!hdfs.mkdirs(configuredRootDirPath)) {
+                            throw new IOException(configuredRootDirPath.toString() + " could not be created");
+                        }
+                        changeOwner(context, hdfs, configuredRootDirPath, flowFile);
+                    }
+
+                    final boolean destinationExists = hdfs.exists(copyFile);
+
+                    // If destination file already exists, resolve that based on processor configuration
+                    if (destinationExists) {
+                        switch (conflictResponse) {
+                            case REPLACE_RESOLUTION:
+                                if (hdfs.delete(copyFile, false)) {
+                                    getLogger().info("deleted {} in order to replace with the contents of {}",
+                                            new Object[]{copyFile, putFlowFile});
+                                }
+                                break;
+                            case IGNORE_RESOLUTION:
+                                session.transfer(putFlowFile, getSuccessRelationship());
+                                getLogger().info("transferring {} to success because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            case FAIL_RESOLUTION:
+                                session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            default:
+                                break;
+                        }
+                    }
+
+                    // Write FlowFile to temp file on HDFS
+                    final StopWatch stopWatch = new StopWatch(true);
+                    session.read(putFlowFile, new InputStreamCallback() {
+
+                        @Override
+                        public void process(InputStream in) throws IOException {
+                            OutputStream fos = null;
+                            Path createdFile = null;
+                            try {
+                                if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
+                                    fos = hdfs.append(copyFile, bufferSize);
+                                } else {
+                                    final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+                                    if (shouldIgnoreLocality(context, session)) {
+                                        cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+                                    }
+
+                                    fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                            FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
+                                            null, null);
+                                }
+
+                                if (codec != null) {
+                                    fos = codec.createOutputStream(fos);
+                                }
+                                createdFile = tempCopyFile;
+                                BufferedInputStream bis = new BufferedInputStream(in);
+                                StreamUtils.copy(bis, fos);
+                                bis = null;
+                                fos.flush();
+                            } finally {
+                                try {
+                                    if (fos != null) {
+                                        fos.close();
+                                    }
+                                } catch (Throwable t) {
+                                    // when talking to remote HDFS clusters, we don't notice problems until fos.close()
+                                    if (createdFile != null) {
+                                        try {
+                                            hdfs.delete(createdFile, false);
+                                        } catch (Throwable ignore) {
+                                        }
+                                    }
+                                    throw t;
+                                }
+                                fos = null;
+                            }
+                        }
+
+                    });
+                    stopWatch.stop();
+                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
+                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                    tempDotCopyFile = tempCopyFile;
+
+                    if (!conflictResponse.equals(APPEND_RESOLUTION)
+                            || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+                        boolean renamed = false;
+                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
+                            if (hdfs.rename(tempCopyFile, copyFile)) {
+                                renamed = true;
+                                break;// rename was successful
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+                        }
+                        if (!renamed) {
+                            hdfs.delete(tempCopyFile, false);
+                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+                                    + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, copyFile, flowFile);
+                    }
+
+                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
+                            new Object[]{putFlowFile, copyFile, millis, dataRate});
+
+                    final String newFilename = copyFile.getName();
+                    final String hdfsPath = copyFile.getParent().toString();
+                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
+                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
+
+                    session.transfer(putFlowFile, getSuccessRelationship());
+
+                } catch (final IOException e) {
+                    Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                    if (causeOptional.isPresent()) {
+                        getLogger().warn("An error occurred while connecting to HDFS. "
+                                        + "Rolling back session, and penalizing flow file {}",
+                                new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+                        session.rollback(true);
+                    } else {
+                        getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
+                        session.transfer(putFlowFile, getFailureRelationship());
+                    }
+                } catch (final Throwable t) {
+                    if (tempDotCopyFile != null) {
+                        try {
+                            hdfs.delete(tempDotCopyFile, false);
+                        } catch (Exception e) {
+                            getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+                        }
+                    }
+                    getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
+                    session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                    context.yield();
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Returns with the expected block size. Note: this might be overwritten by implementations.
+     */
+    protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+
+        try {
+            return getFileSystem().getFileStatus(configuredRootDirPath).getBlockSize();
+        } catch (final IOException e) {
+            getLogger().warn("Error happened during acquiring status for {}. Determining the block size using default value.", configuredRootDirPath, e);
+            return getFileSystem().getDefaultBlockSize(configuredRootDirPath);
+        }
+    }
+
+    /**
+     * Returns with the expected buffer size. Note: this might be overwritten by implementations.
+     */
+    protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        return getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
+    }
+
+    /**
+     * Returns with the expected replication factor. Note: this might be overwritten by implementations.
+     */
+    protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+        return getFileSystem().getDefaultReplication(configuredRootDirPath);
+    }
+
+    /**
+     * Returns if file system should ignore location. Note: this might be overwritten by implementations.
+     */
+    protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
+        return false;
+    }
+
+    /**
+     * I returns a non-null value, changes the owner of the uploaded file to this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change owner. Note: this might be overwritten by implementations.
+     */
+    protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
+        return null;
+    }
+
+    /**
+     * I returns a non-null value, changes the group of the uploaded file to this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change group. Note: this might be overwritten by implementations.
+     */
+    protected String getGroup(final ProcessContext context, final FlowFile flowFile) {

Review comment:
       These methods all have 'default' implementations but are then overridden in the existing concrete implementation. Probably makes more sense to just make them abstract?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org