You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/03/13 16:22:33 UTC

nifi git commit: NIFI-3520 Updated nifi-hdfs-processors POM to depend directly on hadoop-client - Removed NAR dependency on nifi-hadoop-libraries-nar from nifi-hadoop-nar so that hadoop-client dependencies will be included directly in nifi-hadoop-nar - A

Repository: nifi
Updated Branches:
  refs/heads/master 9bb31a70d -> a61f35305


NIFI-3520 Updated nifi-hdfs-processors POM to depend directly on hadoop-client
- Removed NAR dependency on nifi-hadoop-libraries-nar from nifi-hadoop-nar so that hadoop-client dependencies will be included directly in nifi-hadoop-nar
- Added RequiresInstanceClassLoading annotation to AbstractHadoopProcessor and HiveConnectionPool
- UGI relogins are now performed using doAs
- Added debug-level logging for UGI relogins in KerberosTicketRenewer and AbstractHadoopProcessor

This closes #1539.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a61f3530
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a61f3530
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a61f3530

Branch: refs/heads/master
Commit: a61f353051cf065f053d7f186fb32b137986e13d
Parents: 9bb31a7
Author: Jeff Storck <jt...@gmail.com>
Authored: Thu Feb 23 20:03:40 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Mar 13 12:21:49 2017 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/hadoop/KerberosTicketRenewer.java   | 12 +++++++++++-
 .../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml          |  2 +-
 .../nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml     |  8 +-------
 .../nifi/processors/hadoop/AbstractHadoopProcessor.java | 11 ++++++++++-
 .../java/org/apache/nifi/processors/hadoop/PutHDFS.java |  2 --
 .../org/apache/nifi/dbcp/hive/HiveConnectionPool.java   |  2 ++
 6 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
index d451535..bf922fe 100644
--- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
+++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 /**
  * Periodically attempts to renew the Kerberos user's ticket for the given UGI.
@@ -58,11 +59,20 @@ public class KerberosTicketRenewer implements Runnable {
             try {
                 logger.debug("Invoking renewal attempt for Kerberos ticket");
                 // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime.
-                ugi.checkTGTAndReloginFromKeytab();
+                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                    ugi.checkTGTAndReloginFromKeytab();
+                    return null;
+                });
             } catch (IOException e) {
                 logger.error("Failed to renew Kerberos ticket", e);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted while attempting to renew Kerberos ticket", e);
+                Thread.currentThread().interrupt();
+                return;
             }
 
+            logger.debug("current UGI {}", new Object[]{ugi});
+
             // Wait for a bit before checking again.
             try {
                 Thread.sleep(renewalPeriod);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
index c307128..5e2d3d3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
@@ -28,7 +28,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-hadoop-libraries-nar</artifactId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
             <type>nar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index be55e22..3002bcc 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -46,19 +46,13 @@
         </dependency>
         <dependency> 
             <groupId>org.apache.hadoop</groupId> 
-            <artifactId>hadoop-common</artifactId> 
-            <scope>provided</scope> 
+            <artifactId>hadoop-client</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-distributed-cache-client-service-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId> 
-            <artifactId>hadoop-hdfs</artifactId> 
-            <scope>provided</scope> 
-        </dependency>
-        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 8d31a7a..7a8c34c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.Lz4Codec;
 import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -64,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * This is a base class that is helpful when building processors interacting with HDFS.
  */
+@RequiresInstanceClassLoading
 public abstract class AbstractHadoopProcessor extends AbstractProcessor {
     /**
      * Compression Type Enum
@@ -299,6 +301,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
                 fs = getFileSystemAsUser(config, ugi);
             }
         }
+        getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
 
         final Path workingDir = fs.getWorkingDirectory();
         getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
@@ -455,7 +458,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
                 "attempting to renew ticket for user {}", new Object[]{
               kerberosReloginThreshold, ugi.getUserName()});
-            ugi.checkTGTAndReloginFromKeytab();
+            ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                ugi.checkTGTAndReloginFromKeytab();
+                return null;
+            });
             lastKerberosReloginTime = System.currentTimeMillis() / 1000;
             getLogger().info("Kerberos relogin successful or ticket still valid");
         } catch (IOException e) {
@@ -463,6 +469,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             // meaning dfs operations would fail
             getLogger().error("Kerberos relogin failed", e);
             throw new ProcessException("Unable to renew kerberos ticket", e);
+        } catch (InterruptedException e) {
+            getLogger().error("Interrupted while attempting Kerberos relogin", e);
+            throw new ProcessException("Unable to renew kerberos ticket", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 7425b97..6232936 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -68,7 +67,6 @@ import java.util.concurrent.TimeUnit;
 /**
  * This processor copies FlowFiles to HDFS.
  */
-@RequiresInstanceClassLoading
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "HDFS", "put", "copy", "filesystem", "restricted"})
 @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")

http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 1c6ee32..5d80610 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.jdbc.HiveDriver;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -59,6 +60,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
  * Implementation for Database Connection Pooling Service used for Apache Hive
  * connections. Apache DBCP is used for connection pooling functionality.
  */
+@RequiresInstanceClassLoading
 @Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
 @CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.")
 public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {