You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/25 14:02:37 UTC

[kafka] branch 1.1 updated: MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 091f96f  MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)
091f96f is described below

commit 091f96ffb0905251481444cfc14435c67c4e3608
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Fri May 25 07:00:56 2018 -0700

    MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)
    
    The Signal classes are not available in the compile classpath
    if --release is used so we use reflection as a workaround.
    As part of that moved the code to Java and added a simple
    unit test.
    
    Also disabled the signal handler if the IBM JDK is being used
    due to KAFKA-6918.
    
    Manually tested shutdown via ctrl+c and verified that
    the message is printed.
---
 .../kafka/common/utils/LoggingSignalHandler.java   | 101 +++++++++++++++++++++
 .../common/utils/LoggingSignalHandlerTest.java     |  28 ++++++
 core/src/main/scala/kafka/Kafka.scala              |  35 ++-----
 3 files changed, 138 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java
new file mode 100644
index 0000000..fbe6736
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LoggingSignalHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggingSignalHandler.class);
+
+    private final Constructor<?> signalConstructor;
+    private final Class<?> signalHandlerClass;
+    private final Class<?> signalClass;
+    private final Method signalHandleMethod;
+    private final Method signalGetNameMethod;
+    private final Method signalHandlerHandleMethod;
+
+    /**
+     * Create an instance of this class.
+     *
+     * @throws ReflectiveOperationException if the underlying API has changed in an incompatible manner.
+     */
+    public LoggingSignalHandler() throws ReflectiveOperationException {
+        signalClass = Class.forName("sun.misc.Signal");
+        signalConstructor = signalClass.getConstructor(String.class);
+        signalHandlerClass = Class.forName("sun.misc.SignalHandler");
+        signalHandlerHandleMethod = signalHandlerClass.getMethod("handle", signalClass);
+        signalHandleMethod = signalClass.getMethod("handle", signalClass, signalHandlerClass);
+        signalGetNameMethod = signalClass.getMethod("getName");
+    }
+
+    /**
+     * Register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c). This method
+     * does not currently work on Windows.
+     *
+     * @implNote sun.misc.Signal and sun.misc.SignalHandler are described as "not encapsulated" in
+     * http://openjdk.java.net/jeps/260. However, they are not available in the compile classpath if the `--release`
+     * flag is used. As a workaround, we rely on reflection.
+     */
+    public void register() throws ReflectiveOperationException {
+        Map<String, Object> jvmSignalHandlers = new ConcurrentHashMap<>();
+        register("TERM", jvmSignalHandlers);
+        register("INT", jvmSignalHandlers);
+        register("HUP", jvmSignalHandlers);
+    }
+
+    private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) {
+        InvocationHandler invocationHandler = new InvocationHandler() {
+
+            private String getName(Object signal) throws ReflectiveOperationException {
+                return (String) signalGetNameMethod.invoke(signal);
+            }
+
+            private void handle(Object signalHandler, Object signal) throws ReflectiveOperationException {
+                signalHandlerHandleMethod.invoke(signalHandler, signal);
+            }
+
+            @Override
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                Object signal = args[0];
+                log.info("Terminating process due to signal {}", signal);
+                Object handler = jvmSignalHandlers.get(getName(signal));
+                if (handler != null)
+                    handle(handler, signal);
+                return null;
+            }
+        };
+        return Proxy.newProxyInstance(Utils.getContextOrKafkaClassLoader(), new Class[] {signalHandlerClass},
+                invocationHandler);
+    }
+
+    private void register(String signalName, final Map<String, Object> jvmSignalHandlers) throws ReflectiveOperationException {
+        Object signal = signalConstructor.newInstance(signalName);
+        Object signalHandler = createSignalHandler(jvmSignalHandlers);
+        Object oldHandler = signalHandleMethod.invoke(null, signal, signalHandler);
+        if (oldHandler != null)
+            jvmSignalHandlers.put(signalName, oldHandler);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java
new file mode 100644
index 0000000..468e10e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+public class LoggingSignalHandlerTest {
+
+    @Test
+    public void testRegister() throws ReflectiveOperationException {
+        new LoggingSignalHandler().register();
+    }
+
+}
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 25a7216..c47aa52 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -18,14 +18,12 @@
 package kafka
 
 import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
 
-import sun.misc.{Signal, SignalHandler}
 import joptsimple.OptionParser
 import kafka.utils.Implicits._
 import kafka.server.{KafkaServer, KafkaServerStartable}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
-import org.apache.kafka.common.utils.{OperatingSystem, Utils}
+import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils}
 
 import scala.collection.JavaConverters._
 
@@ -55,34 +53,19 @@ object Kafka extends Logging {
     props
   }
 
-  private def registerLoggingSignalHandler(): Unit = {
-    val jvmSignalHandlers = new ConcurrentHashMap[String, SignalHandler]().asScala
-    val handler = new SignalHandler() {
-      override def handle(signal: Signal) {
-        info(s"Terminating process due to signal $signal")
-        jvmSignalHandlers.get(signal.getName).foreach(_.handle(signal))
-      }
-    }
-    def registerHandler(signalName: String) {
-      val oldHandler = Signal.handle(new Signal(signalName), handler)
-      if (oldHandler != null)
-        jvmSignalHandlers.put(signalName, oldHandler)
-    }
-
-    if (!OperatingSystem.IS_WINDOWS) {
-      registerHandler("TERM")
-      registerHandler("INT")
-      registerHandler("HUP")
-    }
-  }
-
   def main(args: Array[String]): Unit = {
     try {
       val serverProps = getPropsFromArgs(args)
       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
 
-      // register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
-      registerLoggingSignalHandler()
+      try {
+        if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
+          new LoggingSignalHandler().register()
+      } catch {
+        case e: ReflectiveOperationException =>
+          warn("Failed to register optional signal handler that logs a message when the process is terminated " +
+            s"by a signal. Reason for registration failure is: $e", e)
+      }
 
       // attach shutdown handler to catch terminating signals as well as normal termination
       Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.