You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/25 14:02:37 UTC
[kafka] branch 1.1 updated: MINOR: Use reflection for signal
handler and do not enable it for IBM JDK (#5047)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 091f96f MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)
091f96f is described below
commit 091f96ffb0905251481444cfc14435c67c4e3608
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Fri May 25 07:00:56 2018 -0700
MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)
The Signal classes are not available in the compile classpath
if --release is used so we use reflection as a workaround.
As part of that moved the code to Java and added a simple
unit test.
Also disabled the signal handler if the IBM JDK is being used
due to KAFKA-6918.
Manually tested shutdown via ctrl+c and verified that
the message is printed.
---
.../kafka/common/utils/LoggingSignalHandler.java | 101 +++++++++++++++++++++
.../common/utils/LoggingSignalHandlerTest.java | 28 ++++++
core/src/main/scala/kafka/Kafka.scala | 35 ++-----
3 files changed, 138 insertions(+), 26 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java
new file mode 100644
index 0000000..fbe6736
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LoggingSignalHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggingSignalHandler.class);
+
+ private final Constructor<?> signalConstructor;
+ private final Class<?> signalHandlerClass;
+ private final Class<?> signalClass;
+ private final Method signalHandleMethod;
+ private final Method signalGetNameMethod;
+ private final Method signalHandlerHandleMethod;
+
+ /**
+ * Create an instance of this class.
+ *
+ * @throws ReflectiveOperationException if the underlying API has changed in an incompatible manner.
+ */
+ public LoggingSignalHandler() throws ReflectiveOperationException {
+ signalClass = Class.forName("sun.misc.Signal");
+ signalConstructor = signalClass.getConstructor(String.class);
+ signalHandlerClass = Class.forName("sun.misc.SignalHandler");
+ signalHandlerHandleMethod = signalHandlerClass.getMethod("handle", signalClass);
+ signalHandleMethod = signalClass.getMethod("handle", signalClass, signalHandlerClass);
+ signalGetNameMethod = signalClass.getMethod("getName");
+ }
+
+ /**
+ * Register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c). This method
+ * does not currently work on Windows.
+ *
+ * @implNote sun.misc.Signal and sun.misc.SignalHandler are described as "not encapsulated" in
+ * http://openjdk.java.net/jeps/260. However, they are not available in the compile classpath if the `--release`
+ * flag is used. As a workaround, we rely on reflection.
+ */
+ public void register() throws ReflectiveOperationException {
+ Map<String, Object> jvmSignalHandlers = new ConcurrentHashMap<>();
+ register("TERM", jvmSignalHandlers);
+ register("INT", jvmSignalHandlers);
+ register("HUP", jvmSignalHandlers);
+ }
+
+ private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) {
+ InvocationHandler invocationHandler = new InvocationHandler() {
+
+ private String getName(Object signal) throws ReflectiveOperationException {
+ return (String) signalGetNameMethod.invoke(signal);
+ }
+
+ private void handle(Object signalHandler, Object signal) throws ReflectiveOperationException {
+ signalHandlerHandleMethod.invoke(signalHandler, signal);
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Object signal = args[0];
+ log.info("Terminating process due to signal {}", signal);
+ Object handler = jvmSignalHandlers.get(getName(signal));
+ if (handler != null)
+ handle(handler, signal);
+ return null;
+ }
+ };
+ return Proxy.newProxyInstance(Utils.getContextOrKafkaClassLoader(), new Class[] {signalHandlerClass},
+ invocationHandler);
+ }
+
+ private void register(String signalName, final Map<String, Object> jvmSignalHandlers) throws ReflectiveOperationException {
+ Object signal = signalConstructor.newInstance(signalName);
+ Object signalHandler = createSignalHandler(jvmSignalHandlers);
+ Object oldHandler = signalHandleMethod.invoke(null, signal, signalHandler);
+ if (oldHandler != null)
+ jvmSignalHandlers.put(signalName, oldHandler);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java
new file mode 100644
index 0000000..468e10e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/LoggingSignalHandlerTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+public class LoggingSignalHandlerTest {
+
+ @Test
+ public void testRegister() throws ReflectiveOperationException {
+ new LoggingSignalHandler().register();
+ }
+
+}
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 25a7216..c47aa52 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -18,14 +18,12 @@
package kafka
import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
-import sun.misc.{Signal, SignalHandler}
import joptsimple.OptionParser
import kafka.utils.Implicits._
import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Exit, Logging}
-import org.apache.kafka.common.utils.{OperatingSystem, Utils}
+import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils}
import scala.collection.JavaConverters._
@@ -55,34 +53,19 @@ object Kafka extends Logging {
props
}
- private def registerLoggingSignalHandler(): Unit = {
- val jvmSignalHandlers = new ConcurrentHashMap[String, SignalHandler]().asScala
- val handler = new SignalHandler() {
- override def handle(signal: Signal) {
- info(s"Terminating process due to signal $signal")
- jvmSignalHandlers.get(signal.getName).foreach(_.handle(signal))
- }
- }
- def registerHandler(signalName: String) {
- val oldHandler = Signal.handle(new Signal(signalName), handler)
- if (oldHandler != null)
- jvmSignalHandlers.put(signalName, oldHandler)
- }
-
- if (!OperatingSystem.IS_WINDOWS) {
- registerHandler("TERM")
- registerHandler("INT")
- registerHandler("HUP")
- }
- }
-
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
- // register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
- registerLoggingSignalHandler()
+ try {
+ if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
+ new LoggingSignalHandler().register()
+ } catch {
+ case e: ReflectiveOperationException =>
+ warn("Failed to register optional signal handler that logs a message when the process is terminated " +
+ s"by a signal. Reason for registration failure is: $e", e)
+ }
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.