You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/13 15:31:43 UTC
[storm] branch master updated: [STORM-3494] Use
UserGroupInformation to login to HDFS only once per process
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new d74a712 [STORM-3494] Use UserGroupInformation to login to HDFS only once per process
new 636ca4c Merge pull request #3219 from Ethanlm/STORM-3494-2
d74a712 is described below
commit d74a712560a4a61f01261e6d14f80d3a4b720af6
Author: Ethan Li <et...@gmail.com>
AuthorDate: Wed Mar 4 09:39:48 2020 -0600
[STORM-3494] Use UserGroupInformation to login to HDFS only once per process
---
docs/distcache-blobstore.md | 13 ++
.../java/org/apache/storm/blobstore/ListHDFS.java | 4 +-
.../org/apache/storm/blobstore/MigrateBlobs.java | 4 +-
.../apache/storm/hdfs/blobstore/HdfsBlobStore.java | 59 +--------
.../apache/storm/hdfs/blobstore/BlobStoreTest.java | 6 +-
storm-client/src/jvm/org/apache/storm/Config.java | 93 ++++++++++++-
.../org/apache/storm/utils/HadoopLoginUtil.java | 145 +++++++++++++++++++++
.../org/apache/storm/utils/ConfigUtilsTest.java | 31 +++++
8 files changed, 286 insertions(+), 69 deletions(-)
diff --git a/docs/distcache-blobstore.md b/docs/distcache-blobstore.md
index 977a6ed..d078bc9 100644
--- a/docs/distcache-blobstore.md
+++ b/docs/distcache-blobstore.md
@@ -315,6 +315,19 @@ The default is 60 seconds, a value of -1 indicates to wait for ever.
* nimbus.code.sync.freq.secs: Frequency at which the background thread on nimbus which syncs code for locally missing blobs. Default is 2 minutes.
```
+Additionally, if you want to access to secure hdfs blobstore, you also need to set the following configs.
+```
+storm.hdfs.login.keytab or blobstore.hdfs.keytab (deprecated)
+storm.hdfs.login.principal or blobstore.hdfs.principal (deprecated)
+```
+
+For example,
+```
+storm.hdfs.login.keytab: /etc/keytab
+storm.hdfs.login.principal: primary/instance@REALM
+```
+
+
## Using the Distributed Cache API, Command Line Interface (CLI)
### Creating blobs
diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
index 96e7038..cfa7131 100644
--- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
+++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
@@ -49,11 +49,11 @@ public class ListHDFS {
hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
if (args.length >= 2) {
System.out.println("SETTING HDFS PRINCIPAL!");
- hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[1]);
+ hdfsConf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, args[1]);
}
if (args.length >= 3) {
System.out.println("SETTING HDFS KEYTAB!");
- hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[2]);
+ hdfsConf.put(Config.STORM_HDFS_LOGIN_KEYTAB, args[2]);
}
/* CREATE THE BLOBSTORES */
diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
index aae81fd..e7a3581 100644
--- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
+++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
@@ -84,11 +84,11 @@ public class MigrateBlobs {
hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
if (args.length >= 3) {
System.out.println("SETTING HDFS PRINCIPAL!");
- hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[2]);
+ hdfsConf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, args[2]);
}
if (args.length >= 4) {
System.out.println("SETTING HDFS KEYTAB!");
- hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[3]);
+ hdfsConf.put(Config.STORM_HDFS_LOGIN_KEYTAB, args[3]);
}
hdfsConf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 7);
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 5d4884b..4bb562d 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -26,15 +26,11 @@ import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.security.auth.Subject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.storm.Config;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
@@ -48,6 +44,7 @@ import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.HadoopLoginUtil;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedKeyAlreadyExistsException;
import org.apache.storm.utils.WrappedKeyNotFoundException;
@@ -74,10 +71,9 @@ import org.slf4j.LoggerFactory;
* subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
*/
public class HdfsBlobStore extends BlobStore {
- public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
private static final String DATA_PREFIX = "data_";
private static final String META_PREFIX = "meta_";
- private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
private BlobStoreAclHandler aclHandler;
private HdfsBlobStoreImpl hbs;
@@ -85,27 +81,6 @@ public class HdfsBlobStore extends BlobStore {
private Map<String, Object> conf;
/**
- * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
- * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
- * We could probably run everything in the doAs but for now just grab the subject.
- */
- private Subject getHadoopUser() {
- Subject subj;
- try {
- subj = UserGroupInformation.getCurrentUser().doAs(
- new PrivilegedAction<Subject>() {
- @Override
- public Subject run() {
- return Subject.getSubject(AccessController.getContext());
- }
- });
- } catch (IOException e) {
- throw new RuntimeException("Error creating subject and logging user in!", e);
- }
- return subj;
- }
-
- /**
* If who is null then we want to use the user hadoop says we are.
* Required for the supervisor to call these routines as its not
* logged in as anyone.
@@ -136,36 +111,10 @@ public class HdfsBlobStore extends BlobStore {
throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
}
LOG.debug("directory is: {}", overrideBase);
- try {
- // if a HDFS keytab/principal have been supplied login, otherwise assume they are
- // logged in already or running insecure HDFS.
- String principal = Config.getBlobstoreHDFSPrincipal(conf);
- String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
- if (principal != null && keyTab != null) {
- String combinedKey = principal + " from " + keyTab;
- synchronized (alreadyLoggedInUsers) {
- localSubject = alreadyLoggedInUsers.get(combinedKey);
- if (localSubject == null) {
- UserGroupInformation.loginUserFromKeytab(principal, keyTab);
- localSubject = getHadoopUser();
- alreadyLoggedInUsers.put(combinedKey, localSubject);
- }
- }
- } else {
- if (principal == null && keyTab != null) {
- throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
+ //Login to hdfs
+ localSubject = HadoopLoginUtil.loginHadoop(conf);
- } else {
- if (principal != null && keyTab == null) {
- throw new RuntimeException("You must specify HDFS keytab go with the principal!");
- }
- }
- localSubject = getHadoopUser();
- }
- } catch (IOException e) {
- throw new RuntimeException("Error logging in from keytab: " + e.getMessage(), e);
- }
aclHandler = new BlobStoreAclHandler(conf);
Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
try {
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index 53cca75..9fa1e06 100644
--- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -19,7 +19,6 @@
package org.apache.storm.hdfs.blobstore;
import org.apache.storm.hdfs.testing.MiniDFSClusterExtension;
-import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -47,7 +45,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import static org.junit.Assert.*;
@@ -153,8 +150,7 @@ public class BlobStoreTest {
assertEquals(value, readInt(store, who, key));
}
- private AutoCloseableBlobStoreContainer initHdfs(String dirName)
- throws Exception {
+ private AutoCloseableBlobStoreContainer initHdfs(String dirName) {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.BLOBSTORE_DIR, dirName);
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a1f689..fb2904f 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -53,6 +53,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.IsStringOrStringL
import org.apache.storm.validation.ConfigValidationAnnotations.IsType;
import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
import org.apache.storm.validation.ConfigValidationAnnotations.Password;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing
@@ -67,6 +69,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.Password;
*/
public class Config extends HashMap<String, Object> {
+ private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
/**
* The serializer class for ListDelegate (tuple payload). The default serializer will be ListDelegateSerializer
*/
@@ -1149,14 +1153,18 @@ public class Config extends HashMap<String, Object> {
* principal for nimbus/supervisor to use to access secure hdfs for the blobstore.
* The format is generally "primary/instance@REALM", where "instance" field is optional.
* If the instance field of the principal is the string "_HOST", it will
- * be replaced with the host name of the server the daemon is running on (by calling
- * {@link #getBlobstoreHDFSPrincipal(Map conf)} method).
+ * be replaced with the host name of the server the daemon is running on
+ * (by calling {@link #getBlobstoreHDFSPrincipal(Map conf)} method).
+ * @Deprecated Use {@link Config#STORM_HDFS_LOGIN_PRINCIPAL} instead.
*/
+ @Deprecated
@IsString
public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal";
/**
* keytab for nimbus/supervisor to use to access secure hdfs for the blobstore.
+ * @Deprecated Use {@link Config#STORM_HDFS_LOGIN_KEYTAB} instead.
*/
+ @Deprecated
@IsString
public static final String BLOBSTORE_HDFS_KEYTAB = "blobstore.hdfs.keytab";
/**
@@ -1166,6 +1174,21 @@ public class Config extends HashMap<String, Object> {
@IsInteger
public static final String STORM_BLOBSTORE_REPLICATION_FACTOR = "storm.blobstore.replication.factor";
/**
+ * The principal for nimbus/supervisor to use to access secure hdfs.
+ * The format is generally "primary/instance@REALM", where "instance" field is optional.
+ * If the instance field of the principal is the string "_HOST", it will
+ * be replaced with the host name of the server the daemon is running on
+ * (by calling {@link #getHdfsPrincipal} method).
+ */
+ @IsString
+ public static final String STORM_HDFS_LOGIN_PRINCIPAL = "storm.hdfs.login.principal";
+
+ /**
+ * The keytab for nimbus/supervisor to use to access secure hdfs.
+ */
+ @IsString
+ public static final String STORM_HDFS_LOGIN_KEYTAB = "storm.hdfs.login.keytab";
+ /**
* The hostname the supervisors/workers should report to nimbus. If unset, Storm will get the hostname to report by calling
* <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
*
@@ -1972,9 +1995,7 @@ public class Config extends HashMap<String, Object> {
private static final String HOSTNAME_PATTERN = "_HOST";
- @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
- public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostException {
- String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+ private static String substituteHostnameInPrincipal(String principal) throws UnknownHostException {
if (principal != null) {
String[] components = principal.split("[/@]");
if (components.length == 3 && components[1].equals(HOSTNAME_PATTERN)) {
@@ -1983,4 +2004,66 @@ public class Config extends HashMap<String, Object> {
}
return principal;
}
+
+ @Deprecated
+ @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+ public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostException {
+ return getHdfsPrincipal(conf);
+ }
+
+ /**
+ * Get the hostname substituted hdfs principal.
+ * @param conf the storm Configuration
+ * @return the principal
+ * @throws UnknownHostException on UnknowHostException
+ */
+ public static String getHdfsPrincipal(Map<String, Object> conf) throws UnknownHostException {
+ String ret;
+
+ String blobstorePrincipal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+ String hdfsPrincipal = (String) conf.get(Config.STORM_HDFS_LOGIN_PRINCIPAL);
+ if (blobstorePrincipal == null && hdfsPrincipal == null) {
+ return null;
+ } else if (blobstorePrincipal == null) {
+ ret = hdfsPrincipal;
+ } else if (hdfsPrincipal == null) {
+ LOG.warn("{} is used as the hdfs principal. Please use {} instead",
+ Config.BLOBSTORE_HDFS_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL);
+ ret = blobstorePrincipal;
+ } else {
+ //both not null;
+ LOG.warn("Both {} and {} are set. Use {} only.",
+ Config.BLOBSTORE_HDFS_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL, Config.STORM_HDFS_LOGIN_PRINCIPAL);
+ ret = hdfsPrincipal;
+ }
+ return substituteHostnameInPrincipal(ret);
+ }
+
+ /**
+ * Get the hdfs keytab.
+ * @param conf the storm Configuration
+ * @return the keytab
+ */
+ public static String getHdfsKeytab(Map<String, Object> conf) {
+ String ret;
+
+ String blobstoreKeyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
+ String hdfsKeyTab = (String) conf.get(Config.STORM_HDFS_LOGIN_KEYTAB);
+ if (blobstoreKeyTab == null && hdfsKeyTab == null) {
+ return null;
+ } else if (blobstoreKeyTab == null) {
+ ret = hdfsKeyTab;
+ } else if (hdfsKeyTab == null) {
+ LOG.warn("{} is used as the hdfs keytab. Please use {} instead",
+ Config.BLOBSTORE_HDFS_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB);
+ ret = blobstoreKeyTab;
+ } else {
+ //both not null;
+ LOG.warn("Both {} and {} are set. Use {} only.",
+ Config.BLOBSTORE_HDFS_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB, Config.STORM_HDFS_LOGIN_KEYTAB);
+ ret = hdfsKeyTab;
+ }
+ return ret;
+ }
+
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java b/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java
new file mode 100644
index 0000000..de8b417
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/HadoopLoginUtil.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import javax.security.auth.Subject;
+import org.apache.storm.Config;
+import org.apache.storm.shade.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * UserGroupInformation#loginUserFromKeytab(String, String) changes the static fields of UserGroupInformation,
+ * especially the current logged-in user, and UserGroupInformation itself is not thread-safe.
+ * So it could introduce bugs if it is called multiple times in a JVM process.
+ * HadoopLoginUtil.loginHadoop guarantees at-most-once login in a JVM process.
+ * This should only be used on the daemon side.
+ */
+public class HadoopLoginUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopLoginUtil.class);
+
+ private static Subject loginSubject = null;
+ private static boolean firstTimeLogin = true;
+
+ /**
+ * Login if a HDFS keytab/principal have been supplied;
+ * otherwise, assume it's already logged in or running on insecure HDFS.
+ * This also guarantees that login only happens at most once.
+ * @param conf the daemon conf
+ * @return the logged in subject or null
+ */
+ public static Subject loginHadoop(Map<String, Object> conf) {
+ if (firstTimeLogin) {
+ synchronized (HadoopLoginUtil.class) {
+ if (firstTimeLogin) {
+ String principal;
+ try {
+ principal = Config.getHdfsPrincipal(conf);
+ } catch (UnknownHostException e) {
+ throw new IllegalArgumentException("Failed to get principal", e);
+ }
+ String keyTab = Config.getHdfsKeytab(conf);
+ if (principal != null && keyTab != null) {
+ loginFromKeytab(principal, keyTab);
+ } else {
+ if (principal == null && keyTab != null) {
+ throw new IllegalArgumentException("HDFS principal is null while keytab is present");
+ } else {
+ if (principal != null && keyTab == null) {
+ throw new IllegalArgumentException("HDFS keytab is null while principal is present");
+ }
+ }
+ }
+
+ loginSubject = getHadoopUser();
+ firstTimeLogin = false;
+ } else {
+ LOG.debug("Already logged in to Hadoop");
+ }
+ }
+ } else {
+ LOG.debug("Already logged in to Hadoop");
+ }
+ LOG.debug("The subject is: {}", loginSubject);
+ return loginSubject;
+ }
+
+ //The Hadoop UserGroupInformation class name
+ private static final String HADOOP_USER_GROUP_INFORMATION_CLASS = "org.apache.hadoop.security.UserGroupInformation";
+
+ private static void loginFromKeytab(String principal, String keyTab) {
+ Preconditions.checkNotNull(principal);
+ Preconditions.checkNotNull(keyTab);
+
+ /* The following code is essentially:
+ UserGroupInformation.loginUserFromKeytab(principal, keyTab);
+ */
+
+ Class<?> ugi;
+ try {
+ ugi = Class.forName(HADOOP_USER_GROUP_INFORMATION_CLASS);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Hadoop is not in the classpath", e);
+ }
+ try {
+ Method login = ugi.getMethod("loginUserFromKeytab", String.class, String.class);
+ login.invoke(null, principal, keyTab);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to login hadoop user from Keytab!", e);
+ }
+
+ LOG.info("Successfully login to Hadoop using keytab as {}", principal);
+ }
+
+ /**
+ * Get the subject from Hadoop. There is no direct interface from UserGroupInformation
+ * to get the subject, so do a doAs and get the context.
+ */
+ private static Subject getHadoopUser() {
+ /* The following code is essentially:
+ Subject sub = UserGroupInformation.getCurrentUser().doAs(
+ (PrivilegedAction<Subject>) () -> Subject.getSubject(AccessController.getContext()));
+ */
+
+ Class<?> ugiClass;
+ try {
+ ugiClass = Class.forName(HADOOP_USER_GROUP_INFORMATION_CLASS);
+ } catch (ClassNotFoundException e) {
+ LOG.info("Hadoop was not found on the class path", e);
+ return null;
+ }
+
+ try {
+ Method currentUserMethod = ugiClass.getMethod("getCurrentUser");
+ Method doAsMethod = ugiClass.getMethod("doAs", PrivilegedAction.class);
+ Object ugi = currentUserMethod.invoke(null);
+ return (Subject) doAsMethod.invoke(ugi,
+ (PrivilegedAction<Subject>) () -> Subject.getSubject(AccessController.getContext()));
+ } catch (Exception e) {
+ throw new RuntimeException("Error getting hadoop user!", e);
+ }
+ }
+}
+
diff --git a/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
index 78e0ce9..0f6dd7c 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/ConfigUtilsTest.java
@@ -91,6 +91,7 @@ public class ConfigUtilsTest {
Assert.assertEquals(expectedValue, ConfigUtils.getValueAsList(key, map));
}
+ @Deprecated
@Test
public void getBlobstoreHDFSPrincipal() throws UnknownHostException {
Map<String, Object> conf = mockMap(Config.BLOBSTORE_HDFS_PRINCIPAL, "primary/_HOST@EXAMPLE.COM");
@@ -120,4 +121,34 @@ public class ConfigUtilsTest {
conf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, principal);
Assert.assertEquals(Config.getBlobstoreHDFSPrincipal(conf), principal);
}
+
+ @Test
+ public void getHfdsPrincipal() throws UnknownHostException {
+ Map<String, Object> conf = mockMap(Config.STORM_HDFS_LOGIN_PRINCIPAL, "primary/_HOST@EXAMPLE.COM");
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), "primary/" + Utils.localHostname() + "@EXAMPLE.COM");
+
+ String principal = "primary/_HOST_HOST@EXAMPLE.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+ principal = "primary/_HOST2@EXAMPLE.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+ principal = "_HOST/instance@EXAMPLE.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+ principal = "primary/instance@_HOST.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+ principal = "_HOST@EXAMPLE.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+
+ principal = "primary/instance@EXAMPLE.COM";
+ conf.put(Config.STORM_HDFS_LOGIN_PRINCIPAL, principal);
+ Assert.assertEquals(Config.getHdfsPrincipal(conf), principal);
+ }
}