You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/06/25 18:14:50 UTC
[storm] branch master updated: STORM-3656 handle worker NPE from
Hadoop auto renewal thread
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new a8b514b STORM-3656 handle worker NPE from Hadoop auto renewal thread
new ba48754 Merge pull request #3292 from agresch/agresch_storm_3656
a8b514b is described below
commit a8b514b042bc5ee82c76ed8b3f8afda8118873b5
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Jun 22 16:07:26 2020 -0500
STORM-3656 handle worker NPE from Hadoop auto renewal thread
---
.../jvm/org/apache/storm/daemon/worker/Worker.java | 2 +-
.../storm/messaging/netty/StormServerHandler.java | 2 +-
.../storm/security/auth/kerberos/AutoTGT.java | 39 ++++-----------
.../src/jvm/org/apache/storm/utils/Utils.java | 56 +++++++++++++++++++---
4 files changed, 61 insertions(+), 38 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..675a008 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -146,7 +146,7 @@ public class Worker implements Shutdownable, DaemonCommon {
String portStr = args[3];
String workerId = args[4];
Map<String, Object> conf = ConfigUtils.readStormConfig();
- Utils.setupDefaultUncaughtExceptionHandler();
+ Utils.setupWorkerUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 542dd9c..5cc5c05 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -63,7 +63,7 @@ public class StormServerHandler extends ChannelInboundHandlerAdapter {
// Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
}
try {
- Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+ Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS, false);
ctx.close();
} catch (Error error) {
LOG.info("Received error in netty thread.. terminating server...");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index f907a23..f7f866a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -13,9 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import com.codahale.metrics.Gauge;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
-import java.security.Principal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -210,35 +208,16 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer, IMetricsR
return;
}
- // We are just trying to do the following:
- //
- // Configuration conf = new Configuration();
- // HadoopKerberosName.setConfiguration(conf);
- // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
-
- Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
- Constructor confCons = confClass.getConstructor();
- Object conf = confCons.newInstance();
- Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
- Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
- hknSetConf.invoke(null, conf);
-
- Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
- Object kerbAuthMethod = null;
- for (Object authMethod : authMethodClass.getEnumConstants()) {
- if ("KERBEROS".equals(authMethod.toString())) {
- kerbAuthMethod = authMethod;
- break;
- }
- }
-
- Class<?> userClass = Class.forName("org.apache.hadoop.security.User");
- Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
- userCons.setAccessible(true);
- String name = getTGT(subject).getClient().toString();
- Object user = userCons.newInstance(name, kerbAuthMethod, null);
- subject.getPrincipals().add((Principal) user);
+ LOG.info("Invoking Hadoop UserGroupInformation.loginUserFromSubject.");
+ Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+ login.invoke(null, subject);
+ //Refer to STORM-3606 for details
+ LOG.warn("UserGroupInformation.loginUserFromSubject will spawn a TGT renewal thread (\"TGT Renewer for <username>\") "
+ + "to execute \"kinit -R\" command some time before the current TGT expires. "
+ + "It will fail because TGT is not in the local TGT cache and the thread will eventually abort. "
+ + "Exceptions from this TGT renewal thread can be ignored. Note: TGT for the Worker is kept in memory. "
+ + "Please refer to STORM-3606 for detailed explanations");
} catch (Exception e) {
LOG.error("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
+ "may not be compatible.", e);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index ff656fa..fec30cc 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -630,11 +630,12 @@ public class Utils {
&& !((String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
- public static void handleUncaughtException(Throwable t) {
- handleUncaughtException(t, defaultAllowedExceptions);
- }
-
- public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
+ /**
+ * Handles uncaught exceptions.
+ *
+ * @param worker true if this is for handling worker exceptions
+ */
+ public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) {
if (t != null) {
if (t instanceof OutOfMemoryError) {
try {
@@ -651,10 +652,38 @@ public class Utils {
return;
}
+ if (worker && isAllowedWorkerException(t)) {
+ LOG.info("Swallowing {} {}", t.getClass(), t);
+ return;
+ }
+
//Running in daemon mode, we would pass Error to calling thread.
throw new Error(t);
}
+ public static void handleUncaughtException(Throwable t) {
+ handleUncaughtException(t, defaultAllowedExceptions, false);
+ }
+
+ public static void handleWorkerUncaughtException(Throwable t) {
+ handleUncaughtException(t, defaultAllowedExceptions, true);
+ }
+
+ // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException
+ // for workers. See STORM-3606 for an explanation.
+ private static boolean isAllowedWorkerException(Throwable t) {
+ if (t instanceof NullPointerException) {
+ StackTraceElement[] stackTrace = t.getStackTrace();
+ for (StackTraceElement trace : stackTrace) {
+ if (trace.getClassName().startsWith("org.apache.hadoop.security.UserGroupInformation")
+ && trace.getMethodName().equals("run")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public static byte[] thriftSerialize(TBase t) {
try {
TSerializer ser = threadSer.get();
@@ -1020,11 +1049,26 @@ public class Utils {
}
};
}
-
+
+ public static UncaughtExceptionHandler createWorkerUncaughtExceptionHandler() {
+ return (thread, thrown) -> {
+ try {
+ handleWorkerUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in thread {}.. terminating worker...", thread.getName(), err);
+ Runtime.getRuntime().exit(-2);
+ }
+ };
+ }
+
public static void setupDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
}
+ public static void setupWorkerUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(createWorkerUncaughtExceptionHandler());
+ }
+
/**
* parses the arguments to extract jvm heap memory size in MB.
*