You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/06/25 18:14:50 UTC

[storm] branch master updated: STORM-3656 handle worker NPE from Hadoop auto renewal thread

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new a8b514b  STORM-3656 handle worker NPE from Hadoop auto renewal thread
     new ba48754  Merge pull request #3292 from agresch/agresch_storm_3656
a8b514b is described below

commit a8b514b042bc5ee82c76ed8b3f8afda8118873b5
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Jun 22 16:07:26 2020 -0500

    STORM-3656 handle worker NPE from Hadoop auto renewal thread
---
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  2 +-
 .../storm/messaging/netty/StormServerHandler.java  |  2 +-
 .../storm/security/auth/kerberos/AutoTGT.java      | 39 ++++-----------
 .../src/jvm/org/apache/storm/utils/Utils.java      | 56 +++++++++++++++++++---
 4 files changed, 61 insertions(+), 38 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..675a008 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -146,7 +146,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         String portStr = args[3];
         String workerId = args[4];
         Map<String, Object> conf = ConfigUtils.readStormConfig();
-        Utils.setupDefaultUncaughtExceptionHandler();
+        Utils.setupWorkerUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
         Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 542dd9c..5cc5c05 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -63,7 +63,7 @@ public class StormServerHandler extends ChannelInboundHandlerAdapter {
             // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
         }
         try {
-            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS, false);
             ctx.close();
         } catch (Error error) {
             LOG.info("Received error in netty thread.. terminating server...");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index f907a23..f7f866a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -13,9 +13,7 @@
 package org.apache.storm.security.auth.kerberos;
 
 import com.codahale.metrics.Gauge;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
-import java.security.Principal;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -210,35 +208,16 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer, IMetricsR
                 return;
             }
 
-            // We are just trying to do the following:
-            //
-            // Configuration conf = new Configuration();
-            // HadoopKerberosName.setConfiguration(conf);
-            // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
-
-            Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
-            Constructor confCons = confClass.getConstructor();
-            Object conf = confCons.newInstance();
-            Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
-            Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
-            hknSetConf.invoke(null, conf);
-
-            Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
-            Object kerbAuthMethod = null;
-            for (Object authMethod : authMethodClass.getEnumConstants()) {
-                if ("KERBEROS".equals(authMethod.toString())) {
-                    kerbAuthMethod = authMethod;
-                    break;
-                }
-            }
-
-            Class<?> userClass = Class.forName("org.apache.hadoop.security.User");
-            Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
-            userCons.setAccessible(true);
-            String name = getTGT(subject).getClient().toString();
-            Object user = userCons.newInstance(name, kerbAuthMethod, null);
-            subject.getPrincipals().add((Principal) user);
+            LOG.info("Invoking Hadoop UserGroupInformation.loginUserFromSubject.");
+            Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+            login.invoke(null, subject);
 
+            //Refer to STORM-3606 for details
+            LOG.warn("UserGroupInformation.loginUserFromSubject will spawn a TGT renewal thread (\"TGT Renewer for <username>\") "
+                    + "to execute \"kinit -R\" command some time before the current TGT expires. "
+                    + "It will fail because TGT is not in the local TGT cache and the thread will eventually abort. "
+                    + "Exceptions from this TGT renewal thread can be ignored. Note: TGT for the Worker is kept in memory. "
+                    + "Please refer to STORM-3606 for detailed explanations");
         } catch (Exception e) {
             LOG.error("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
                      + "may not be compatible.", e);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index ff656fa..fec30cc 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -630,11 +630,12 @@ public class Utils {
                 && !((String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
-    public static void handleUncaughtException(Throwable t) {
-        handleUncaughtException(t, defaultAllowedExceptions);
-    }
-
-    public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
+    /**
+     * Handles uncaught exceptions.
+     *
+     * @param worker true if this is for handling worker exceptions
+     */
+    public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) {
         if (t != null) {
             if (t instanceof OutOfMemoryError) {
                 try {
@@ -651,10 +652,38 @@ public class Utils {
             return;
         }
 
+        if (worker && isAllowedWorkerException(t)) {
+            LOG.info("Swallowing {} {}", t.getClass(), t);
+            return;
+        }
+
         //Running in daemon mode, we would pass Error to calling thread.
         throw new Error(t);
     }
 
+    public static void handleUncaughtException(Throwable t) {
+        handleUncaughtException(t, defaultAllowedExceptions, false);
+    }
+
+    public static void handleWorkerUncaughtException(Throwable t) {
+        handleUncaughtException(t, defaultAllowedExceptions, true);
+    }
+
+    // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException
+    // for workers.  See STORM-3606 for an explanation.
+    private static boolean isAllowedWorkerException(Throwable t) {
+        if (t instanceof NullPointerException) {
+            StackTraceElement[] stackTrace = t.getStackTrace();
+            for (StackTraceElement trace : stackTrace) {
+                if (trace.getClassName().startsWith("org.apache.hadoop.security.UserGroupInformation")
+                        && trace.getMethodName().equals("run")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     public static byte[] thriftSerialize(TBase t) {
         try {
             TSerializer ser = threadSer.get();
@@ -1020,11 +1049,26 @@ public class Utils {
             }
         };
     }
-    
+
+    public static UncaughtExceptionHandler createWorkerUncaughtExceptionHandler() {
+        return (thread, thrown) -> {
+            try {
+                handleWorkerUncaughtException(thrown);
+            } catch (Error err) {
+                LOG.error("Received error in thread {}.. terminating worker...", thread.getName(), err);
+                Runtime.getRuntime().exit(-2);
+            }
+        };
+    }
+
     public static void setupDefaultUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
     }
 
+    public static void setupWorkerUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(createWorkerUncaughtExceptionHandler());
+    }
+
     /**
      * parses the arguments to extract jvm heap memory size in MB.
      *