You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/03/13 16:22:33 UTC
nifi git commit: NIFI-3520 Updated nifi-hdfs-processors POM to depend
directly on hadoop-client - Removed NAR dependency on
nifi-hadoop-libraries-nar from nifi-hadoop-nar so that hadoop-client
dependencies will be included directly in nifi-hadoop-nar - A
Repository: nifi
Updated Branches:
refs/heads/master 9bb31a70d -> a61f35305
NIFI-3520 Updated nifi-hdfs-processors POM to depend directly on hadoop-client
- Removed NAR dependency on nifi-hadoop-libraries-nar from nifi-hadoop-nar so that hadoop-client dependencies will be included directly in nifi-hadoop-nar
- Added RequiresInstanceClassLoading annotation to AbstractHadoopProcessor and HiveConnectionPool
- UGI relogins are now performed using doAs
- Added debug-level logging for UGI relogins in KerberosTicketRenewer and AbstractHadoopProcessor
This closes #1539.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a61f3530
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a61f3530
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a61f3530
Branch: refs/heads/master
Commit: a61f353051cf065f053d7f186fb32b137986e13d
Parents: 9bb31a7
Author: Jeff Storck <jt...@gmail.com>
Authored: Thu Feb 23 20:03:40 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Mar 13 12:21:49 2017 -0400
----------------------------------------------------------------------
.../org/apache/nifi/hadoop/KerberosTicketRenewer.java | 12 +++++++++++-
.../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml | 2 +-
.../nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml | 8 +-------
.../nifi/processors/hadoop/AbstractHadoopProcessor.java | 11 ++++++++++-
.../java/org/apache/nifi/processors/hadoop/PutHDFS.java | 2 --
.../org/apache/nifi/dbcp/hive/HiveConnectionPool.java | 2 ++
6 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
index d451535..bf922fe 100644
--- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
+++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.logging.ComponentLog;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
/**
* Periodically attempts to renew the Kerberos user's ticket for the given UGI.
@@ -58,11 +59,20 @@ public class KerberosTicketRenewer implements Runnable {
try {
logger.debug("Invoking renewal attempt for Kerberos ticket");
// While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime.
- ugi.checkTGTAndReloginFromKeytab();
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ ugi.checkTGTAndReloginFromKeytab();
+ return null;
+ });
} catch (IOException e) {
logger.error("Failed to renew Kerberos ticket", e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while attempting to renew Kerberos ticket", e);
+ Thread.currentThread().interrupt();
+ return;
}
+ logger.debug("current UGI {}", new Object[]{ugi});
+
// Wait for a bit before checking again.
try {
Thread.sleep(renewalPeriod);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
index c307128..5e2d3d3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml
@@ -28,7 +28,7 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-libraries-nar</artifactId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index be55e22..3002bcc 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -46,19 +46,13 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
+ <artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 8d31a7a..7a8c34c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -64,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* This is a base class that is helpful when building processors interacting with HDFS.
*/
+@RequiresInstanceClassLoading
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/**
* Compression Type Enum
@@ -299,6 +301,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
fs = getFileSystemAsUser(config, ugi);
}
}
+ getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
@@ -455,7 +458,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
"attempting to renew ticket for user {}", new Object[]{
kerberosReloginThreshold, ugi.getUserName()});
- ugi.checkTGTAndReloginFromKeytab();
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ ugi.checkTGTAndReloginFromKeytab();
+ return null;
+ });
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
getLogger().info("Kerberos relogin successful or ticket still valid");
} catch (IOException e) {
@@ -463,6 +469,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// meaning dfs operations would fail
getLogger().error("Kerberos relogin failed", e);
throw new ProcessException("Unable to renew kerberos ticket", e);
+ } catch (InterruptedException e) {
+ getLogger().error("Interrupted while attempting Kerberos relogin", e);
+ throw new ProcessException("Unable to renew kerberos ticket", e);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 7425b97..6232936 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -68,7 +67,6 @@ import java.util.concurrent.TimeUnit;
/**
* This processor copies FlowFiles to HDFS.
*/
-@RequiresInstanceClassLoading
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HDFS", "put", "copy", "filesystem", "restricted"})
@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
http://git-wip-us.apache.org/repos/asf/nifi/blob/a61f3530/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 1c6ee32..5d80610 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -22,6 +22,7 @@ import org.apache.commons.dbcp.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -59,6 +60,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
* Implementation for Database Connection Pooling Service used for Apache Hive
* connections. Apache DBCP is used for connection pooling functionality.
*/
+@RequiresInstanceClassLoading
@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.")
public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {