You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/13 15:31:43 UTC

[storm] branch master updated: [STORM-3494] Use UserGroupInformation to login to HDFS only once per process

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new d74a712  [STORM-3494] Use UserGroupInformation to login to HDFS only once per process
     new 636ca4c  Merge pull request #3219 from Ethanlm/STORM-3494-2
d74a712 is described below

commit d74a712560a4a61f01261e6d14f80d3a4b720af6
Author: Ethan Li <et...@gmail.com>
AuthorDate: Wed Mar 4 09:39:48 2020 -0600

    [STORM-3494] Use UserGroupInformation to login to HDFS only once per process
---
 docs/distcache-blobstore.md                        |  13 ++
 .../java/org/apache/storm/blobstore/ListHDFS.java  |   4 +-
 .../org/apache/storm/blobstore/MigrateBlobs.java   |   4 +-
 .../apache/storm/hdfs/blobstore/HdfsBlobStore.java |  59 +--------
 .../apache/storm/hdfs/blobstore/BlobStoreTest.java |   6 +-
 storm-client/src/jvm/org/apache/storm/Config.java  |  93 ++++++++++++-
 .../org/apache/storm/utils/HadoopLoginUtil.java    | 145 +++++++++++++++++++++
 .../org/apache/storm/utils/ConfigUtilsTest.java    |  31 +++++
 8 files changed, 286 insertions(+), 69 deletions(-)

diff --git a/docs/distcache-blobstore.md b/docs/distcache-blobstore.md
index 977a6ed..d078bc9 100644
--- a/docs/distcache-blobstore.md
+++ b/docs/distcache-blobstore.md
@@ -315,6 +315,19 @@ The default is 60 seconds, a value of -1 indicates to wait for ever.
 * nimbus.code.sync.freq.secs: Frequency at which the background thread on nimbus which syncs code for locally missing blobs. Default is 2 minutes.
 ```
 
+Additionally, if you want to access to secure hdfs blobstore, you also need to set the following configs.  
+```
+storm.hdfs.login.keytab or blobstore.hdfs.keytab (deprecated)
+storm.hdfs.login.principal or blobstore.hdfs.principal (deprecated)
+```
+
+For example,
+```
+storm.hdfs.login.keytab: /etc/keytab
+storm.hdfs.login.principal: primary/instance@REALM
+```
+
+
 ## Using the Distributed Cache API, Command Line Interface (CLI)
 
 ### Creating blobs 
diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
index 96e7038..cfa7131 100644
--- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
+++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
@@ -49,11 +49,11 @@ public class ListHDFS {
         hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
         if (args.length >= 2) {
             System.out.println("SETTING HDFS PRINCIPAL!");
-            hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[1]);
+            hdfsConf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, args[1]);
         }
         if (args.length >= 3) {
             System.out.println("SETTING HDFS KEYTAB!");
-            hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[2]);
+            hdfsConf.put(Config.STORM_HDFS_LOGIN_KEYTAB, args[2]);
         }
         
         /* CREATE THE BLOBSTORES */
diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
index aae81fd..e7a3581 100644
--- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
+++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
@@ -84,11 +84,11 @@ public class MigrateBlobs {
         hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
         if (args.length >= 3) {
             System.out.println("SETTING HDFS PRINCIPAL!");
-            hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[2]);
+            hdfsConf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, args[2]);
         }
         if (args.length >= 4) {
             System.out.println("SETTING HDFS KEYTAB!");
-            hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[3]);
+            hdfsConf.put(Config.STORM_HDFS_LOGIN_KEYTAB, args[3]);
         }
         hdfsConf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 7);
         
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 5d4884b..4bb562d 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -26,15 +26,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import javax.security.auth.Subject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.AtomicOutputStream;
 import org.apache.storm.blobstore.BlobStore;
@@ -48,6 +44,7 @@ import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.HadoopLoginUtil;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.WrappedKeyAlreadyExistsException;
 import org.apache.storm.utils.WrappedKeyNotFoundException;
@@ -74,10 +71,9 @@ import org.slf4j.LoggerFactory;
  * subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
  */
 public class HdfsBlobStore extends BlobStore {
-    public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
     private static final String DATA_PREFIX = "data_";
     private static final String META_PREFIX = "meta_";
-    private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
 
     private BlobStoreAclHandler aclHandler;
     private HdfsBlobStoreImpl hbs;
@@ -85,27 +81,6 @@ public class HdfsBlobStore extends BlobStore {
     private Map<String, Object> conf;
 
     /**
-     * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
-     * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
-     * We could probably run everything in the doAs but for now just grab the subject.
-     */
-    private Subject getHadoopUser() {
-        Subject subj;
-        try {
-            subj = UserGroupInformation.getCurrentUser().doAs(
-                    new PrivilegedAction<Subject>() {
-                        @Override
-                        public Subject run() {
-                            return Subject.getSubject(AccessController.getContext());
-                        }
-                    });
-        } catch (IOException e) {
-            throw new RuntimeException("Error creating subject and logging user in!", e);
-        }
-        return subj;
-    }
-
-    /**
      * If who is null then we want to use the user hadoop says we are.
      * Required for the supervisor to call these routines as its not
      * logged in as anyone.
@@ -136,36 +111,10 @@ public class HdfsBlobStore extends BlobStore {
             throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
         }
         LOG.debug("directory is: {}", overrideBase);
-        try {
-            // if a HDFS keytab/principal have been supplied login, otherwise assume they are
-            // logged in already or running insecure HDFS.
-            String principal = Config.getBlobstoreHDFSPrincipal(conf);
-            String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
 
-            if (principal != null && keyTab != null) {
-                String combinedKey = principal + " from " + keyTab;
-                synchronized (alreadyLoggedInUsers) {
-                    localSubject = alreadyLoggedInUsers.get(combinedKey);
-                    if (localSubject == null) {
-                        UserGroupInformation.loginUserFromKeytab(principal, keyTab);
-                        localSubject = getHadoopUser();
-                        alreadyLoggedInUsers.put(combinedKey, localSubject);
-                    }
-                }
-            } else {
-                if (principal == null && keyTab != null) {
-                    throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
+        //Login to hdfs
+        localSubject = HadoopLoginUtil.loginHadoop(conf);
 
-                } else {
-                    if (principal != null && keyTab == null) {
-                        throw new RuntimeException("You must specify HDFS keytab go with the principal!");
-                    }
-                }
-                localSubject = getHadoopUser();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error logging in from keytab: " + e.getMessage(), e);
-        }
         aclHandler = new BlobStoreAclHandler(conf);
         Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
         try {
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index 53cca75..9fa1e06 100644
--- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -19,7 +19,6 @@
 package org.apache.storm.hdfs.blobstore;
 
 import org.apache.storm.hdfs.testing.MiniDFSClusterExtension;
-import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.AtomicOutputStream;
 import org.apache.storm.blobstore.BlobStore;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -47,7 +45,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.junit.Assert.*;
 
@@ -153,8 +150,7 @@ public class BlobStoreTest {
         assertEquals(value, readInt(store, who, key));
     }
 
-    private AutoCloseableBlobStoreContainer initHdfs(String dirName)
-        throws Exception {
+    private AutoCloseableBlobStoreContainer initHdfs(String dirName) {
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.BLOBSTORE_DIR, dirName);
         conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a1f689..fb2904f 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -53,6 +53,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.IsStringOrStringL
 import org.apache.storm.validation.ConfigValidationAnnotations.IsType;
 import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.storm.validation.ConfigValidationAnnotations.Password;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing
@@ -67,6 +69,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.Password;
  */
 public class Config extends HashMap<String, Object> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
     /**
      * The serializer class for ListDelegate (tuple payload). The default serializer will be ListDelegateSerializer
      */
@@ -1149,14 +1153,18 @@ public class Config extends HashMap<String, Object> {
      * principal for nimbus/supervisor to use to access secure hdfs for the blobstore.
      * The format is generally "primary/instance@REALM", where "instance" field is optional.
      * If the instance field of the principal is the string "_HOST", it will
-     * be replaced with the host name of the server the daemon is running on (by calling
-     * {@link #getBlobstoreHDFSPrincipal(Map conf)} method).
+     * be replaced with the host name of the server the daemon is running on
+     * (by calling {@link #getBlobstoreHDFSPrincipal(Map conf)} method).
+     * @Deprecated Use {@link Config#STORM_HDFS_LOGIN_PRINCIPAL} instead.
      */
+    @Deprecated
     @IsString
     public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal";
     /**
      * keytab for nimbus/supervisor to use to access secure hdfs for the blobstore.
+     * @Deprecated Use {@link Config#STORM_HDFS_LOGIN_KEYTAB} instead.
      */
+    @Deprecated
     @IsString
     public static final String BLOBSTORE_HDFS_KEYTAB = "blobstore.hdfs.keytab";
     /**
@@ -1166,6 +1174,21 @@ public class Config extends HashMap<String, Object> {
     @IsInteger
     public static final String STORM_BLOBSTORE_REPLICATION_FACTOR = "storm.blobstore.replication.factor";
     /**
+     * The principal for nimbus/supervisor to use to access secure hdfs.
+     * The format is generally "primary/instance@REALM", where "instance" field is optional.
+     * If the instance field of the principal is the string "_HOST", it will
+     * be replaced with the host name of the server the daemon is running on
+     * (by calling {@link #getHdfsPrincipal} method).
+     */
+    @IsString
+    public static final String STORM_HDFS_LOGIN_PRINCIPAL = "storm.hdfs.login.principal";
+
+    /**
+     * The keytab for nimbus/supervisor to use to access secure hdfs.
+     */
+    @IsString
+    public static final String STORM_HDFS_LOGIN_KEYTAB = "storm.hdfs.login.keytab";
+    /**
      * The hostname the supervisors/workers should report to nimbus. If unset, Storm will get the hostname to report by calling
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      *
@@ -1972,9 +1995,7 @@ public class Config extends HashMap<String, Object> {
 
     private static final String HOSTNAME_PATTERN = "_HOST";
 
-    @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
-    public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostException {
-        String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+    private static String substituteHostnameInPrincipal(String principal) throws UnknownHostException {
         if (principal != null) {
             String[] components = principal.split("[/@]");
             if (components.length == 3 && components[1].equals(HOSTNAME_PATTERN)) {
@@ -1983,4 +2004,66 @@ public class Config extends HashMap<String, Object> {
         }
         return principal;
     }
+
+    @Deprecated
+    @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+    public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostException {
+        return getHdfsPrincipal(conf);
+    }
+
+    /**
+     * Get the hostname substituted hdfs principal.
+     * @param conf the storm Configuration
+     * @return the principal
+     * @throws UnknownHostException on UnknowHostException
+     */
+    public static String getHdfsPrincipal(Map<String, Object> conf) throws UnknownHostException {
+        String ret;
+
+        String blobstorePrincipal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+        String hdfsPrincipal = (String) conf.get(Config.STORM_HDFS_LOGIN_PRINCIPAL);
+        if (blobstorePrincipal == null && hdfsPrincipal == null) {
+            return null;
+        } else if (blobstorePrincipal == null) {
+            ret = hdfsPrincipal;
+        } else if (hdfsPrincipal == null) {
+            LOG.warn("{} is used as the hdfs principal. Please use {} instead",
+                Config.BLOBSTORE_HDFS_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL);
+            ret = blobstorePrincipal;
+        } else {
+            //both not null;
+            LOG.warn("Both {} and {} are set. Use {} only.",
+                Config.BLOBSTORE_HDFS_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL);
+            ret = hdfsPrincipal;
+        }
+        return substituteHostnameInPrincipal(ret);
+    }
+
+    /**
+     * Get the hdfs keytab.
+     * @param conf the storm Configuration
+     * @return the keytab
+     */
+    public static String getHdfsKeytab(Map<String, Object> conf) {
+        String ret;
+
+        String blobstoreKeyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
+        String hdfsKeyTab = (String) conf.get(Config.STORM_HDFS_LOGIN_KEYTAB);
+        if (blobstoreKeyTab == null && hdfsKeyTab == null) {
+            return null;
+        } else if (blobstoreKeyTab == null) {
+            ret = hdfsKeyTab;
+        } else if (hdfsKeyTab == null) {
+            LOG.warn("{} is used as the hdfs keytab. Please use {} instead",
+                Config.BLOBSTORE_HDFS_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB);
+            ret = blobstoreKeyTab;
+        } else {
+            //both not null;
+            LOG.warn("Both {} and {} are set. Use {} only.",
+                Config.BLOBSTORE_HDFS_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB);
+            ret = hdfsKeyTab;
+        }
+        return ret;
+    }
+
 }
diff --git a/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java b/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java
new file mode 100644
index 0000000..de8b417
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import javax.security.auth.Subject;
+import org.apache.storm.Config;
+import org.apache.storm.shade.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * UserGroupInformation#loginUserFromKeytab(String, String) changes the static fields of UserGroupInformation,
+ * especially the current logged-in user, and UserGroupInformation itself is not thread-safe.
+ * So it could introduce bugs if it is called multiple times in a JVM process.
+ * HadoopLoginUtil.loginHadoop guarantees at-most-once login in a JVM process.
+ * This should only be used on the daemon side.
+ */
+public class HadoopLoginUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopLoginUtil.class);
+
+    private static Subject loginSubject = null;
+    private static boolean firstTimeLogin = true;
+
+    /**
+     * Login if a HDFS keytab/principal have been supplied;
+     * otherwise, assume it's already logged in or running on insecure HDFS.
+     * This also guarantees that login only happens at most once.
+     * @param conf the daemon conf
+     * @return the logged in subject or null
+     */
+    public static Subject loginHadoop(Map<String, Object> conf) {
+        if (firstTimeLogin) {
+            synchronized (HadoopLoginUtil.class) {
+                if (firstTimeLogin) {
+                    String principal;
+                    try {
+                        principal = Config.getHdfsPrincipal(conf);
+                    } catch (UnknownHostException e) {
+                        throw new IllegalArgumentException("Failed to get principal", e);
+                    }
+                    String keyTab = Config.getHdfsKeytab(conf);
+                    if (principal != null && keyTab != null) {
+                        loginFromKeytab(principal, keyTab);
+                    } else {
+                        if (principal == null && keyTab != null) {
+                            throw new IllegalArgumentException("HDFS principal is null while keytab is present");
+                        } else {
+                            if (principal != null && keyTab == null) {
+                                throw new IllegalArgumentException("HDFS keytab is null while principal is present");
+                            }
+                        }
+                    }
+
+                    loginSubject = getHadoopUser();
+                    firstTimeLogin = false;
+                } else {
+                    LOG.debug("Already logged in to Hadoop");
+                }
+            }
+        } else {
+            LOG.debug("Already logged in to Hadoop");
+        }
+        LOG.debug("The subject is: {}", loginSubject);
+        return loginSubject;
+    }
+
+    //The Hadoop UserGroupInformation class name
+    private static final String HADOOP_USER_GROUP_INFORMATION_CLASS = "org.apache.hadoop.security.UserGroupInformation";
+
+    private static void loginFromKeytab(String principal, String keyTab) {
+        Preconditions.checkNotNull(principal);
+        Preconditions.checkNotNull(keyTab);
+
+        /* The following code is essentially:
+         UserGroupInformation.loginUserFromKeytab(principal, keyTab);
+         */
+
+        Class<?> ugi;
+        try {
+            ugi = Class.forName(HADOOP_USER_GROUP_INFORMATION_CLASS);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Hadoop is not in the classpath", e);
+        }
+        try {
+            Method login = ugi.getMethod("loginUserFromKeytab", String.class, String.class);
+            login.invoke(null, principal, keyTab);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to login hadoop user from Keytab!", e);
+        }
+
+        LOG.info("Successfully login to Hadoop using keytab as {}", principal);
+    }
+
+    /**
+     * Get the subject from Hadoop. There is no direct interface from UserGroupInformation
+     * to get the subject, so do a doAs and get the context.
+     */
+    private static Subject getHadoopUser() {
+        /* The following code is essentially:
+        Subject sub = UserGroupInformation.getCurrentUser().doAs(
+            (PrivilegedAction<Subject>) () -> Subject.getSubject(AccessController.getContext()));
+        */
+
+        Class<?> ugiClass;
+        try {
+            ugiClass = Class.forName(HADOOP_USER_GROUP_INFORMATION_CLASS);
+        } catch (ClassNotFoundException e) {
+            LOG.info("Hadoop was not found on the class path", e);
+            return null;
+        }
+
+        try {
+            Method currentUserMethod = ugiClass.getMethod("getCurrentUser");
+            Method doAsMethod = ugiClass.getMethod("doAs", PrivilegedAction.class);
+            Object ugi = currentUserMethod.invoke(null);
+            return (Subject) doAsMethod.invoke(ugi,
+                (PrivilegedAction<Subject>) () -> Subject.getSubject(AccessController.getContext()));
+        } catch (Exception e) {
+            throw new RuntimeException("Error getting hadoop user!", e);
+        }
+    }
+}
+
diff --git a/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
index 78e0ce9..0f6dd7c 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
@@ -91,6 +91,7 @@ public class ConfigUtilsTest {
         Assert.assertEquals(expectedValue, ConfigUtils.getValueAsList(key, map));
     }
 
+    @Deprecated
     @Test
     public void getBlobstoreHDFSPrincipal() throws UnknownHostException {
         Map<String, Object> conf = mockMap(Config.BLOBSTORE_HDFS_PRINCIPAL, "primary/_HOST@EXAMPLE.COM");
@@ -120,4 +121,34 @@ public class ConfigUtilsTest {
         conf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, principal);
         Assert.assertEquals(Config.getBlobstoreHDFSPrincipal(conf), principal);
     }
+
+    @Test
+    public void getHfdsPrincipal() throws UnknownHostException {
+        Map<String, Object> conf = mockMap(Config.STORM_HDFS_LOGIN_PRINCIPAL, "primary/_HOST@EXAMPLE.COM");
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), "primary/" +  Utils.localHostname() + "@EXAMPLE.COM");
+
+        String principal = "primary/_HOST_HOST@EXAMPLE.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+        principal = "primary/_HOST2@EXAMPLE.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+        principal = "_HOST/instance@EXAMPLE.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+        principal = "primary/instance@_HOST.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+        principal = "_HOST@EXAMPLE.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+        principal = "primary/instance@EXAMPLE.COM";
+        conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+        Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+    }
 }