You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/16 23:58:57 UTC
[kafka] branch trunk updated: KAFKA-7402: Implement KIP-376
AutoCloseable additions
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9646602 KAFKA-7402: Implement KIP-376 AutoCloseable additions
9646602 is described below
commit 9646602d6832ad0a5f2e9b65af5df1a80a571691
Author: Yishun Guan <gy...@gmail.com>
AuthorDate: Fri Nov 16 15:58:47 2018 -0800
KAFKA-7402: Implement KIP-376 AutoCloseable additions
---
.../java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java | 2 +-
.../apache/kafka/clients/consumer/internals/AbstractCoordinator.java | 2 +-
.../src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java | 2 +-
clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java | 2 +-
clients/src/main/java/org/apache/kafka/common/network/Selector.java | 2 +-
.../main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +-
.../internals/expiring/ExpiringCredentialRefreshingLogin.java | 2 +-
.../src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java | 2 +-
.../src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java | 2 +-
.../main/java/org/apache/kafka/connect/transforms/TimestampRouter.java | 2 +-
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../org/apache/kafka/streams/processor/internals/RecordCollector.java | 2 +-
tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java | 2 +-
13 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 763fe51..6af4705 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -40,7 +40,7 @@ import java.util.Map;
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
-public interface ConsumerInterceptor<K, V> extends Configurable {
+public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
* This is called just before the records are returned by
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 335e0f2..fb710f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -933,7 +933,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
- private class HeartbeatThread extends KafkaThread {
+ private class HeartbeatThread extends KafkaThread implements AutoCloseable {
private boolean enabled = false;
private boolean closed = false;
private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 995bdaa..55d6b25 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.Configurable;
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
-public interface MetricsReporter extends Configurable {
+public interface MetricsReporter extends Configurable, AutoCloseable {
/**
* This is called when the reporter is first registered to initially register all existing metrics
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b1375..3bca276 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
* to memory pressure or other reasons</li>
* </ul>
*/
-public class KafkaChannel {
+public class KafkaChannel implements AutoCloseable {
private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000;
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index b960fcb..843d46d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -983,7 +983,7 @@ public class Selector implements Selectable, AutoCloseable {
return deque == null ? 0 : deque.size();
}
- private class SelectorMetrics {
+ private class SelectorMetrics implements AutoCloseable {
private final Metrics metrics;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 0cc2cec..7512c82 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,7 +37,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
* and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens.
* This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
*/
-public class MemoryRecordsBuilder {
+public class MemoryRecordsBuilder implements AutoCloseable {
private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index 546b158..f6bb264 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
* server when the login is a type that has a limited lifetime/will expire. The
* credentials for the login must implement {@link ExpiringCredential}.
*/
-public abstract class ExpiringCredentialRefreshingLogin {
+public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable {
/**
* Class that can be overridden for testing
*/
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index d1e97b2..cf82f86 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -232,7 +232,7 @@ public class ConnectMetrics {
* the {@link Metrics} class, so that the sensor names are made to be unique (based on the group name)
* and so the sensors are removed when this group is {@link #close() closed}.
*/
- public class MetricGroup {
+ public class MetricGroup implements AutoCloseable {
private final MetricGroupId groupId;
private final Set<String> sensorNames = new HashSet<>();
private final String sensorPrefix;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index b7fe74f..e5b990f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -232,7 +232,7 @@ public class WorkerConnector {
'}';
}
- class ConnectorMetricsGroup implements ConnectorStatus.Listener {
+ class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable {
/**
* Use {@link AbstractStatus.State} since it has all of the states we want,
* unlike {@link WorkerConnector.State}.
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index a0343b8..78384df 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -28,7 +28,7 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable {
private static final Pattern TOPIC = Pattern.compile("${topic}", Pattern.LITERAL);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 819732a..bbda11d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -123,7 +123,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
* @see org.apache.kafka.streams.Topology
*/
@InterfaceStability.Evolving
-public class KafkaStreams {
+public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
private static final int DEFAULT_CLOSE_TIMEOUT = 0;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 09de11d..bbfb049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import java.util.Map;
-public interface RecordCollector {
+public interface RecordCollector extends AutoCloseable {
<K, V> void send(final String topic,
final K key,
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index fafa9e6..f0a991f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -56,7 +56,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.store;
* If logging is left enabled, log output on stdout can be easily ignored by checking
* whether a given line is valid JSON.
*/
-public class VerifiableProducer {
+public class VerifiableProducer implements AutoCloseable {
private final ObjectMapper mapper = new ObjectMapper();
private final String topic;