You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:45 UTC

[19/50] [abbrv] nifi git commit: [PATCH] NIFI-997 Periodically attempt a kerberos relogin in AbstractHadoopProcessor

[PATCH] NIFI-997 Periodically attempt a kerberos relogin in AbstractHadoopProcessor

 - attempt a relogin based on an interval specified in the processor configuration
 - use hadoop's UserGroupInformation.checkTGTAndReloginFromKeytab to determine if a relogin is necessary based on the ticket and do so if needed
 - improve code readability with HdfsResources object in AbstractHadoopProcessor

Reviewed and Amended by Tony Kurc (tkurc@apache.org). This closes #97


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f2c4f2d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f2c4f2d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f2c4f2d2

Branch: refs/heads/NIFI-655
Commit: f2c4f2d2a15f23edd8933f6dbd10210de1700de3
Parents: d63cd6b
Author: ricky <ri...@cloudera.com>
Authored: Sun Oct 25 00:49:58 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Oct 25 00:49:58 2015 -0400

----------------------------------------------------------------------
 .../hadoop/AbstractHadoopProcessor.java         | 92 +++++++++++++++++---
 1 file changed, 79 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f2c4f2d2/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 0102b1f..a67a9fd 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -25,6 +25,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
@@ -50,9 +51,9 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.Tuple;
 
 /**
  * This is a base class that is helpful when building processors interacting with HDFS.
@@ -132,15 +133,24 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
 
+    private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
+            .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     protected static final List<PropertyDescriptor> properties;
 
     private static final Object RESOURCES_LOCK = new Object();
 
+    private long kerberosReloginThreshold;
+    private long lastKerberosReloginTime;
+
     static {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HADOOP_CONFIGURATION_RESOURCES);
         props.add(KERBEROS_PRINCIPAL);
         props.add(KERBEROS_KEYTAB);
+        props.add(KERBEROS_RELOGIN_PERIOD);
         properties = Collections.unmodifiableList(props);
         try {
             NIFI_PROPERTIES = NiFiProperties.getInstance();
@@ -154,12 +164,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
     }
 
     // variables shared by all threads of this processor
-    // Hadoop Configuration and FileSystem
-    private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
+    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
+    private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
 
     @Override
     protected void init(ProcessorInitializationContext context) {
-        hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
+        hdfsResources.set(new HdfsResources(null, null, null));
     }
 
     @Override
@@ -173,8 +183,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
     @OnScheduled
     public final void abstractOnScheduled(ProcessContext context) throws IOException {
         try {
-            Tuple<Configuration, FileSystem> resources = hdfsResources.get();
-            if (resources.getKey() == null || resources.getValue() == null) {
+            // This value will be null when called from ListHDFS, because it overrides all of the default
+            // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
+            if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) {
+                kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+            }
+            HdfsResources resources = hdfsResources.get();
+            if (resources.getConfiguration() == null) {
                 String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
                 String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
                 dir = dir == null ? "/" : dir;
@@ -183,14 +198,14 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             }
         } catch (IOException ex) {
             getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
-            hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
+            hdfsResources.set(new HdfsResources(null, null, null));
             throw ex;
         }
     }
 
     @OnStopped
     public final void abstractOnStopped() {
-        hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
+        hdfsResources.set(new HdfsResources(null, null, null));
     }
 
     private static Configuration getConfigurationFromResources(String configResources) throws IOException {
@@ -224,7 +239,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
     /*
      * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
      */
-    Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
+    HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
         // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
         // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
         // NarThreadContextClassLoader.
@@ -244,13 +259,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             // If kerberos is enabled, create the file system as the kerberos principal
             // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
             FileSystem fs = null;
+            UserGroupInformation ugi = null;
             synchronized (RESOURCES_LOCK) {
                 if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
                     String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
                     String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue();
                     UserGroupInformation.setConfiguration(config);
-                    UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
+                    ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
                     fs = getFileSystemAsUser(config, ugi);
+                    lastKerberosReloginTime = System.currentTimeMillis() / 1000;
                 } else {
                     config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                     config.set("hadoop.security.authentication", "simple");
@@ -260,7 +277,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             config.set(disableCacheName, "true");
             getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
                     new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
-            return new Tuple<>(config, fs);
+            return new HdfsResources(config, fs, ugi);
 
         } finally {
             Thread.currentThread().setContextClassLoader(savedClassLoader);
@@ -392,10 +409,59 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
     }
 
     protected Configuration getConfiguration() {
-        return hdfsResources.get().getKey();
+        return hdfsResources.get().getConfiguration();
     }
 
     protected FileSystem getFileSystem() {
-        return hdfsResources.get().getValue();
+        // if kerberos is enabled, check if the ticket should be renewed before returning the FS
+        if (hdfsResources.get().getUserGroupInformation() != null && isTicketOld()) {
+            tryKerberosRelogin(hdfsResources.get().getUserGroupInformation());
+        }
+        return hdfsResources.get().getFileSystem();
+    }
+
+    protected void tryKerberosRelogin(UserGroupInformation ugi) {
+        try {
+            getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
+                "attempting to renew ticket for user {}", new Object[]{
+              kerberosReloginThreshold, ugi.getUserName()});
+            ugi.checkTGTAndReloginFromKeytab();
+            lastKerberosReloginTime = System.currentTimeMillis() / 1000;
+            getLogger().info("Kerberos relogin successful or ticket still valid");
+        } catch (IOException e) {
+            // Most likely case of this happening is ticket is expired and error getting a new one,
+            // meaning dfs operations would fail
+            getLogger().error("Kerberos relogin failed", e);
+            throw new ProcessException("Unable to renew kerberos ticket", e);
+        }
+    }
+
+    protected boolean isTicketOld() {
+        return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
+    }
+
+
+    static protected class HdfsResources {
+        private final Configuration configuration;
+        private final FileSystem fileSystem;
+        private final UserGroupInformation userGroupInformation;
+
+        public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
+            this.configuration = configuration;
+            this.fileSystem = fileSystem;
+            this.userGroupInformation = userGroupInformation;
+        }
+
+        public Configuration getConfiguration() {
+            return configuration;
+        }
+
+        public FileSystem getFileSystem() {
+            return fileSystem;
+        }
+
+        public UserGroupInformation getUserGroupInformation() {
+            return userGroupInformation;
+        }
     }
 }