You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/23 06:33:40 UTC
[kafka] branch 1.1 updated: KAFKA-6578: Changed the Connect
distributed and standalone main method to log all exceptions (#4609)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new d1994f2 KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions (#4609)
d1994f2 is described below
commit d1994f2b476fb1d9b4a9be0ab66221970fe03ebd
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Feb 23 00:29:49 2018 -0600
KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions (#4609)
Any exception thrown by calls within a `main()` method are not logged unless explicitly done so. This change simply adds a try-catch block around most of the content of the distributed and standalone `main()` methods.
---
.../kafka/connect/cli/ConnectDistributed.java | 81 ++++++++++---------
.../kafka/connect/cli/ConnectStandalone.java | 93 ++++++++++++----------
2 files changed, 95 insertions(+), 79 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8930602..54854fe4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -57,51 +58,59 @@ public class ConnectDistributed {
Exit.exit(1);
}
- Time time = Time.SYSTEM;
- log.info("Kafka Connect distributed worker initializing ...");
- long initStart = time.hiResClockMs();
- WorkerInfo initInfo = new WorkerInfo();
- initInfo.logAll();
+ try {
+ Time time = Time.SYSTEM;
+ log.info("Kafka Connect distributed worker initializing ...");
+ long initStart = time.hiResClockMs();
+ WorkerInfo initInfo = new WorkerInfo();
+ initInfo.logAll();
- String workerPropsFile = args[0];
- Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ String workerPropsFile = args[0];
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
- log.info("Scanning for plugin classes. This might take a moment ...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- DistributedConfig config = new DistributedConfig(workerProps);
+ log.info("Scanning for plugin classes. This might take a moment ...");
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
+ DistributedConfig config = new DistributedConfig(workerProps);
- String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
+ String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ log.debug("Kafka cluster ID: {}", kafkaClusterId);
- RestServer rest = new RestServer(config);
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+ RestServer rest = new RestServer(config);
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
- KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
- offsetBackingStore.configure(config);
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+ offsetBackingStore.configure(config);
- Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+ Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
- StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
- statusBackingStore.configure(config);
+ Converter internalValueConverter = worker.getInternalValueConverter();
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+ statusBackingStore.configure(config);
- ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
+ ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config);
- DistributedHerder herder = new DistributedHerder(config, time, worker,
- kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl.toString());
- final Connect connect = new Connect(herder, rest);
- log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
- try {
- connect.start();
- } catch (Exception e) {
- log.error("Failed to start Connect", e);
- connect.stop();
- }
+ DistributedHerder herder = new DistributedHerder(config, time, worker,
+ kafkaClusterId, statusBackingStore, configBackingStore,
+ advertisedUrl.toString());
+ final Connect connect = new Connect(herder, rest);
+ log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
+ try {
+ connect.start();
+ } catch (Exception e) {
+ log.error("Failed to start Connect", e);
+ connect.stop();
+ Exit.exit(3);
+ }
- // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
- connect.awaitStop();
+ // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+ connect.awaitStop();
+
+ } catch (Throwable t) {
+ log.error("Stopping due to error", t);
+ Exit.exit(2);
+ }
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 413cb46..aba9d9c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -62,58 +62,65 @@ public class ConnectStandalone {
Exit.exit(1);
}
- Time time = Time.SYSTEM;
- log.info("Kafka Connect standalone worker initializing ...");
- long initStart = time.hiResClockMs();
- WorkerInfo initInfo = new WorkerInfo();
- initInfo.logAll();
+ try {
+ Time time = Time.SYSTEM;
+ log.info("Kafka Connect standalone worker initializing ...");
+ long initStart = time.hiResClockMs();
+ WorkerInfo initInfo = new WorkerInfo();
+ initInfo.logAll();
- String workerPropsFile = args[0];
- Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ String workerPropsFile = args[0];
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
- log.info("Scanning for plugin classes. This might take a moment ...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- StandaloneConfig config = new StandaloneConfig(workerProps);
+ log.info("Scanning for plugin classes. This might take a moment ...");
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
+ StandaloneConfig config = new StandaloneConfig(workerProps);
- String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
+ String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ log.debug("Kafka cluster ID: {}", kafkaClusterId);
- RestServer rest = new RestServer(config);
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+ RestServer rest = new RestServer(config);
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
- Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
+ Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
- Herder herder = new StandaloneHerder(worker, kafkaClusterId);
- final Connect connect = new Connect(herder, rest);
- log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
+ Herder herder = new StandaloneHerder(worker, kafkaClusterId);
+ final Connect connect = new Connect(herder, rest);
+ log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
- try {
- connect.start();
- for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
- Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
- FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
- @Override
- public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
- if (error != null)
- log.error("Failed to create job for {}", connectorPropsFile);
- else
- log.info("Created connector {}", info.result().name());
- }
- });
- herder.putConnectorConfig(
- connectorProps.get(ConnectorConfig.NAME_CONFIG),
- connectorProps, false, cb);
- cb.get();
+ try {
+ connect.start();
+ for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
+ Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+ @Override
+ public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+ if (error != null)
+ log.error("Failed to create job for {}", connectorPropsFile);
+ else
+ log.info("Created connector {}", info.result().name());
+ }
+ });
+ herder.putConnectorConfig(
+ connectorProps.get(ConnectorConfig.NAME_CONFIG),
+ connectorProps, false, cb);
+ cb.get();
+ }
+ } catch (Throwable t) {
+ log.error("Stopping after connector error", t);
+ connect.stop();
+ Exit.exit(3);
}
+
+ // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+ connect.awaitStop();
+
} catch (Throwable t) {
- log.error("Stopping after connector error", t);
- connect.stop();
+ log.error("Stopping due to error", t);
+ Exit.exit(2);
}
-
- // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
- connect.awaitStop();
}
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.