You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/16 23:58:57 UTC

[kafka] branch trunk updated: KAFKA-7402: Implement KIP-376 AutoCloseable additions

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9646602  KAFKA-7402: Implement KIP-376 AutoCloseable additions
9646602 is described below

commit 9646602d6832ad0a5f2e9b65af5df1a80a571691
Author: Yishun Guan <gy...@gmail.com>
AuthorDate: Fri Nov 16 15:58:47 2018 -0800

    KAFKA-7402: Implement KIP-376 AutoCloseable additions
---
 .../java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java     | 2 +-
 .../apache/kafka/clients/consumer/internals/AbstractCoordinator.java    | 2 +-
 .../src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java  | 2 +-
 clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java | 2 +-
 clients/src/main/java/org/apache/kafka/common/network/Selector.java     | 2 +-
 .../main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java  | 2 +-
 .../internals/expiring/ExpiringCredentialRefreshingLogin.java           | 2 +-
 .../src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java  | 2 +-
 .../src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java | 2 +-
 .../main/java/org/apache/kafka/connect/transforms/TimestampRouter.java  | 2 +-
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java        | 2 +-
 .../org/apache/kafka/streams/processor/internals/RecordCollector.java   | 2 +-
 tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java      | 2 +-
 13 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 763fe51..6af4705 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -40,7 +40,7 @@ import java.util.Map;
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
  */
-public interface ConsumerInterceptor<K, V> extends Configurable {
+public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
 
     /**
      * This is called just before the records are returned by
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 335e0f2..fb710f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -933,7 +933,7 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    private class HeartbeatThread extends KafkaThread {
+    private class HeartbeatThread extends KafkaThread implements AutoCloseable {
         private boolean enabled = false;
         private boolean closed = false;
         private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 995bdaa..55d6b25 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.Configurable;
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Configurable {
+public interface MetricsReporter extends Configurable, AutoCloseable {
 
     /**
      * This is called when the reporter is first registered to initially register all existing metrics
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b1375..3bca276 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
  * to memory pressure or other reasons</li>
  * </ul>
  */
-public class KafkaChannel {
+public class KafkaChannel implements AutoCloseable {
     private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000;
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b960fcb..843d46d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -983,7 +983,7 @@ public class Selector implements Selectable, AutoCloseable {
         return deque == null ? 0 : deque.size();
     }
 
-    private class SelectorMetrics {
+    private class SelectorMetrics implements AutoCloseable {
         private final Metrics metrics;
         private final String metricGrpPrefix;
         private final Map<String, String> metricTags;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 0cc2cec..7512c82 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,7 +37,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  * and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens.
  * This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
  */
-public class MemoryRecordsBuilder {
+public class MemoryRecordsBuilder implements AutoCloseable {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
     private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index 546b158..f6bb264 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * server when the login is a type that has a limited lifetime/will expire. The
  * credentials for the login must implement {@link ExpiringCredential}.
  */
-public abstract class ExpiringCredentialRefreshingLogin {
+public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable {
     /**
      * Class that can be overridden for testing
      */
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index d1e97b2..cf82f86 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -232,7 +232,7 @@ public class ConnectMetrics {
      * the {@link Metrics} class, so that the sensor names are made to be unique (based on the group name)
      * and so the sensors are removed when this group is {@link #close() closed}.
      */
-    public class MetricGroup {
+    public class MetricGroup implements AutoCloseable {
         private final MetricGroupId groupId;
         private final Set<String> sensorNames = new HashSet<>();
         private final String sensorPrefix;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index b7fe74f..e5b990f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -232,7 +232,7 @@ public class WorkerConnector {
                        '}';
     }
 
-    class ConnectorMetricsGroup implements ConnectorStatus.Listener {
+    class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable {
         /**
          * Use {@link AbstractStatus.State} since it has all of the states we want,
          * unlike {@link WorkerConnector.State}.
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index a0343b8..78384df 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -28,7 +28,7 @@ import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable {
 
     private static final Pattern TOPIC = Pattern.compile("${topic}", Pattern.LITERAL);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 819732a..bbda11d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -123,7 +123,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
  * @see org.apache.kafka.streams.Topology
  */
 @InterfaceStability.Evolving
-public class KafkaStreams {
+public class KafkaStreams implements AutoCloseable {
 
     private static final String JMX_PREFIX = "kafka.streams";
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 09de11d..bbfb049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 
 import java.util.Map;
 
-public interface RecordCollector {
+public interface RecordCollector extends AutoCloseable {
 
     <K, V> void send(final String topic,
                      final K key,
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index fafa9e6..f0a991f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -56,7 +56,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store;
  * If logging is left enabled, log output on stdout can be easily ignored by checking
  * whether a given line is valid JSON.
  */
-public class VerifiableProducer {
+public class VerifiableProducer implements AutoCloseable {
 
     private final ObjectMapper mapper = new ObjectMapper();
     private final String topic;