You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/09/27 20:25:26 UTC

[2/2] nifi git commit: NIFI-5557: handling expired ticket by rollback and penalization

NIFI-5557: handling expired ticket by rollback and penalization


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f55cbfb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f55cbfb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f55cbfb

Branch: refs/heads/master
Commit: 0f55cbfb9f49087492a333c59b63e146a1444d55
Parents: 2e1005e
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
Authored: Tue Aug 28 10:47:59 2018 +0200
Committer: Jeff Storck <jt...@gmail.com>
Committed: Thu Sep 27 16:24:31 2018 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/PutHDFS.java  | 33 ++++++++++++++++++++
 1 file changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f55cbfb/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 91fd204..6e71331 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -49,6 +49,9 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
+
+import com.google.common.base.Throwables;
 
 import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
@@ -60,8 +63,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -373,6 +379,17 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                     session.transfer(putFlowFile, REL_SUCCESS);
 
+                } catch (final IOException e) {
+                  Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                  if (causeOptional.isPresent()) {
+                    getLogger().warn("An error occurred while connecting to HDFS. "
+                        + "Rolling back session, and penalizing flow file {}",
+                         new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+                    session.rollback(true);
+                  } else {
+                    getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
+                    session.transfer(putFlowFile, REL_FAILURE);
+                  }
                 } catch (final Throwable t) {
                     if (tempDotCopyFile != null) {
                         try {
@@ -391,6 +408,22 @@ public class PutHDFS extends AbstractHadoopProcessor {
         });
     }
 
+
+    /**
+     * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
+     * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
+     * @param t The throwable to inspect for the cause.
+     * @return
+     */
+    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
+       Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+       return causalChain
+               .filter(expectedCauseType::isInstance)
+               .map(expectedCauseType::cast)
+               .filter(causePredicate)
+               .findFirst();
+    }
+
     protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
         try {
             // Change owner and group of file if configured to do so