You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/19 17:53:34 UTC

[1/2] kafka git commit: MINOR: Code Cleanup

Repository: kafka
Updated Branches:
  refs/heads/trunk 3bfc073f0 -> f87d58b79


http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 12beef8..475066f 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Time;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Calendar;
@@ -38,7 +39,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class TimestampConverterTest {
-
     private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
     private static final Calendar EPOCH;
     private static final Calendar TIME;
@@ -48,6 +48,9 @@ public class TimestampConverterTest {
     private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
     private static final String DATE_PLUS_TIME_STRING;
 
+    private final TimestampConverter<SourceRecord> xformKey = new TimestampConverter.Key<>();
+    private final TimestampConverter<SourceRecord> xformValue = new TimestampConverter.Value<>();
+
     static {
         EPOCH = GregorianCalendar.getInstance(UTC);
         EPOCH.setTimeInMillis(0L);
@@ -73,31 +76,33 @@ public class TimestampConverterTest {
 
     // Configuration
 
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
+
     @Test(expected = ConfigException.class)
     public void testConfigNoTargetType() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidTargetType() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigMissingFormat() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidFormat() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
-        xform.configure(config);
+        xformValue.configure(config);
     }
 
 
@@ -105,9 +110,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessIdentity() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -115,9 +119,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToDate() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -125,9 +128,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToTime() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -135,9 +137,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToUnix() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -145,12 +146,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToString() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -161,9 +161,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessDateToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -172,9 +171,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimeToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -183,9 +181,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessUnixToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -193,12 +190,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessStringToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -209,9 +205,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaIdentity() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -219,9 +214,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToDate() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Date.SCHEMA, transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -229,9 +223,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToTime() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Time.SCHEMA, transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -239,9 +232,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToUnix() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -249,12 +241,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToString() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -265,9 +256,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaDateToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -276,9 +266,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimeToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -287,9 +276,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaUnixToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
+        xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -297,12 +285,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaStringToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -313,14 +300,13 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessFieldConversion() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
-        xform.configure(config);
+        xformValue.configure(config);
 
         Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, value));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value));
 
         assertNull(transformed.valueSchema());
         assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
@@ -328,11 +314,10 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaFieldConversion() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
-        xform.configure(config);
+        xformValue.configure(config);
 
         // ts field is a unix timestamp
         Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
@@ -343,7 +328,7 @@ public class TimestampConverterTest {
         original.put("ts", DATE_PLUS_TIME_UNIX);
         original.put("other", "test");
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original));
 
         Schema expectedSchema = SchemaBuilder.struct()
                 .field("ts", Timestamp.SCHEMA)
@@ -359,9 +344,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testKey() {
-        TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>();
-        xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
+        xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
 
         assertNull(transformed.keySchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
index 595a71c..ba823ba 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -24,10 +25,15 @@ import java.util.Collections;
 import static org.junit.Assert.assertEquals;
 
 public class TimestampRouterTest {
+    private final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void defaultConfiguration() {
-        final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
         xform.configure(Collections.<String, Object>emptyMap()); // defaults
         final SourceRecord record = new SourceRecord(
                 null, null,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index 69fb026..e2dfa17 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -29,10 +30,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class ValueToKeyTest {
+    private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
         xform.configure(Collections.singletonMap("fields", "a,b"));
 
         final HashMap<String, Integer> value = new HashMap<>();
@@ -53,7 +59,6 @@ public class ValueToKeyTest {
 
     @Test
     public void withSchema() {
-        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
         xform.configure(Collections.singletonMap("fields", "a,b"));
 
         final Schema valueSchema = SchemaBuilder.struct()

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ca0e916..9637927 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -53,7 +53,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
  * @see KGroupedStream
  * @see KStreamBuilder#stream(String...)
  */
-@SuppressWarnings("unused")
 @InterfaceStability.Evolving
 public interface KStream<K, V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b941f78..46769eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1039,7 +1039,6 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final TimestampExtractor timestampExtractor,
@@ -1083,14 +1082,13 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
-    
+
     /**
      * Create a {@link GlobalKTable} for the specified topic.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
@@ -1121,7 +1119,6 @@ public class KStreamBuilder extends TopologyBuilder {
      *                           {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
@@ -1198,7 +1195,6 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topic     the topic name; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 6ed3e84..a1b40a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -92,7 +92,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows,
@@ -101,7 +100,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                             final Windows<W> windows) {
@@ -152,7 +150,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -163,7 +160,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                                   final Aggregator<? super K, ? super V, T> aggregator,
@@ -266,7 +262,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
@@ -309,7 +304,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return count(sessionWindows, (String) null);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                            final StateStoreSupplier<SessionStore> storeSupplier) {
@@ -350,7 +344,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                               .sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index d30177c..1b26a5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -67,7 +67,6 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 8dc330d..c308a0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -57,7 +57,6 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 90a9f77..9cee4f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -56,7 +56,6 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 05ecf40..b43efaa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -57,7 +57,6 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 38beb63..4c2b40f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -49,7 +49,6 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
-    @SuppressWarnings("unchecked")
     public Map<TopicPartition, Long> initialize() {
         final Set<String> storeNames = stateMgr.initialize(processorContext);
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9dc5640..eb75b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1135,7 +1135,6 @@ public class StreamThread extends Thread {
         streamsMetrics.removeAllSensors();
     }
 
-    @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState(final boolean cleanRun) {
         log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
             logPrefix, activeTasks.keySet(), standbyTasks.keySet(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 77fb58a..8607472 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -120,9 +120,8 @@ public class AssignmentInfo {
     public static AssignmentInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
-        DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
 
-        try {
+        try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
             // Decode version
             int version = in.readInt();
             if (version < 0 || version > CURRENT_VERSION) {
@@ -156,7 +155,6 @@ public class AssignmentInfo {
 
             return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
 
-
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index d43c613..2a54cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -61,7 +61,6 @@ public final class StateSerdes<K, V> {
      * @param valueSerde    the serde for values; cannot be null
      * @throws IllegalArgumentException if key or value serde is null
      */
-    @SuppressWarnings("unchecked")
     public StateSerdes(final String topic,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 6190b88..b7d41b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -53,7 +53,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         this.valueSerde = valueSerde;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 9a4a97c..b786ce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -63,7 +63,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 9a826c4..34fe8f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -67,7 +67,6 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt
 
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
         changeLogger = new StoreChangeLogger<>(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7034592..4d93a9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -239,7 +239,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public synchronized void put(K key, V value) {
         Objects.requireNonNull(key, "key cannot be null");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 82e5b23..75b7910 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -318,7 +318,7 @@ public class KStreamBuilderTest {
         final String topicName = "topic-1";
         
         builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
-        
+
         assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
     }
@@ -373,7 +373,7 @@ public class KStreamBuilderTest {
         final String topic = "topic-5";
 
         builder.stream(topicPattern);
-        
+
         assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
         assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
 
@@ -401,7 +401,6 @@ public class KStreamBuilderTest {
         assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void kStreamTimestampExtractorShouldBeNull() throws Exception {
         builder.stream("topic");
@@ -409,7 +408,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
         builder.stream(new MockTimestampExtractor(), null, null, "topic");
@@ -419,7 +417,6 @@ public class KStreamBuilderTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
         builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
@@ -427,7 +424,6 @@ public class KStreamBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
         builder.table("topic", "store");
@@ -435,7 +431,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void kTableTimestampExtractorShouldBeNull() throws Exception {
         builder.table("topic", "store");
@@ -443,7 +438,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
         builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c69cd70..0662944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("unchecked")
 public class KStreamSessionWindowAggregateProcessorTest {
 
     private static final long GAP_MS = 5 * 60 * 1000L;
@@ -84,7 +83,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private MockProcessorContext context;
 
 
-    @SuppressWarnings("unchecked")
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 316494d..98bb346 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -89,6 +89,8 @@ public class WindowedStreamPartitionerTest {
                 assertEquals(expected, actual);
             }
         }
+
+        defaultPartitioner.close();
     }
 
     @Test
@@ -113,6 +115,8 @@ public class WindowedStreamPartitionerTest {
         Serializer<?> inner1 = windowedSerializer1.innerSerializer();
         assertNotNull("Inner serializer should be not null", inner1);
         assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
+        windowedSerializer.close();
+        windowedSerializer1.close();
     }
 
     @Test
@@ -137,5 +141,7 @@ public class WindowedStreamPartitionerTest {
         Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
         assertNotNull("Inner deserializer should be not null", inner1);
         assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
+        windowedDeserializer.close();
+        windowedDeserializer1.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index c0ce9e7..bad193a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -78,7 +78,7 @@ public class TopologyBuilderTest {
     @Test
     public void shouldAddSourcePatternWithOffsetReset() {
         final TopologyBuilder builder = new TopologyBuilder();
-        
+
         final String earliestTopicPattern = "earliest.*Topic";
         final String latestTopicPattern = "latest.*Topic";
 
@@ -107,7 +107,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final Pattern expectedPattern = Pattern.compile("test-.*");
-        
+
         builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
 
         assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
@@ -143,7 +143,7 @@ public class TopologyBuilderTest {
     }
 
 
-    
+
     @Test(expected = TopologyBuilderException.class)
     public void testAddSourceWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -579,7 +579,6 @@ public class TopologyBuilderTest {
         assertEquals(2, properties.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -596,7 +595,6 @@ public class TopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -702,7 +700,6 @@ public class TopologyBuilderTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -711,7 +708,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -720,7 +716,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -730,7 +725,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -740,7 +734,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -749,7 +742,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
index 56e2410..e6cca87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotEquals;
 
 public class QuickUnionTest {
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUnite() {
         QuickUnion<Long> qu = new QuickUnion<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index a358be5..25c3cbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -177,7 +177,6 @@ public class StandbyTaskTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUpdate() throws Exception {
         StreamsConfig config = createConfig(baseDir);
@@ -224,7 +223,6 @@ public class StandbyTaskTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUpdateKTable() throws Exception {
         consumer.assign(Utils.mkList(ktable));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17eb50a..a6d1179 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -131,7 +131,6 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.configure(configurationMap);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testSubscription() throws Exception {
         builder.addSource("source1", "topic1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1a0bebe..a27fb62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -724,7 +724,6 @@ public class StreamTaskTest {
         });
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
         task.close(true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b4598fd..c6d12c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -185,7 +185,6 @@ public class KeyValueStoreTestDriver<K, V> {
         final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
         final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
-            @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
index a77d4ac..ff7cdc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -61,7 +61,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
     private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore(bytesStore);
     private final Map sent = new HashMap<>();
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() throws Exception {
         context.setTime(0);
@@ -74,7 +73,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
         store.close();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldLogPuts() throws Exception {
         final byte[] value1 = {0};
@@ -88,7 +86,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
         assertArrayEquals(value2, (byte[]) sent.get(key2));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldLogRemoves() throws Exception {
         final Bytes key1 = Bytes.wrap(new byte[]{0});

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4054990..0fa5216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -46,7 +46,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
     private KeyValueStore<String, String>
         otherUnderlyingStore;
 
-    @SuppressWarnings("unchecked")
     @Before
     public void before() {
         final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
@@ -141,8 +140,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
         } catch (UnsupportedOperationException e) { }
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldFindValueForKeyWhenMultiStores() throws Exception {
         final KeyValueStore<String, String> cache = newStoreInstance();
@@ -167,7 +164,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
         assertEquals(2, results.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldSupportRangeAcrossMultipleKVStores() throws Exception {
         final KeyValueStore<String, String> cache = newStoreInstance();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index bc21a7a..4baecb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -43,6 +43,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
         assertEquals("A", peekingIterator.peekNextKey());
         assertEquals("A", peekingIterator.peekNextKey());
         assertTrue(peekingIterator.hasNext());
+        peekingIterator.close();
     }
 
     @Test
@@ -52,6 +53,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
         assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
         assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
         assertTrue(peekingIterator.hasNext());
+        peekingIterator.close();
     }
 
     @Test
@@ -71,18 +73,21 @@ public class DelegatingPeekingKeyValueIteratorTest {
             index++;
         }
         assertEquals(kvs.length, index);
+        peekingIterator.close();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception {
         final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.next();
+        peekingIterator.close();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception {
         final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.peekNextKey();
+        peekingIterator.close();
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 6e0059f..89a4d63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -64,6 +64,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
             values[index++] = value;
             assertArrayEquals(bytes[bytesIndex++], value);
         }
+        iterator.close();
     }
 
 
@@ -171,6 +172,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
             assertArrayEquals(bytes[bytesIndex++], keys);
             iterator.next();
         }
+        iterator.close();
     }
 
     private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index fed39b7..2088fbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -80,6 +80,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
             assertArrayEquals(expected.value, next.value);
             assertEquals(expected.key, next.key);
         }
+        iterator.close();
     }
 
     @Test
@@ -98,6 +99,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
+        iterator.close();
     }
 
     @Test
@@ -112,5 +114,6 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
+        iterator.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
index a7e1aed..3e935cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -46,7 +46,6 @@ public class MeteredSegmentedBytesStoreTest {
     private final Set<String> latencyRecorded = new HashSet<>();
     private final Set<String> throughputRecorded = new HashSet<>();
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() throws Exception {
         final Metrics metrics = new Metrics();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 237514e..ff7d234 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -133,7 +133,6 @@ public class RocksDBKeyValueStoreSupplierTest {
         assertThat(store, is(instanceOf(MeteredKeyValueStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true);
@@ -142,7 +141,6 @@ public class RocksDBKeyValueStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 70f3708..97936fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -112,7 +112,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertThat(store, is(instanceOf(RocksDBSessionStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true);
@@ -121,7 +120,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false);
@@ -130,7 +128,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
         store = createStore(false, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 77fe8ee..f177aa3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -135,7 +135,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true, 3);
@@ -144,7 +143,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false, 3);
@@ -153,7 +151,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
         store = createStore(false, false, 3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 8a3a8ba..c2b03c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -39,7 +39,6 @@ public class StoreChangeLoggerTest {
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
-                @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(final String topic,
                                           final K1 key,
@@ -71,7 +70,6 @@ public class StoreChangeLoggerTest {
         context.close();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testAddRemove() throws Exception {
         context.setTime(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 0af2594..c01e169 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -101,6 +101,7 @@ public class BrokerCompatibilityTest {
 
 
         System.out.println("close Kafka Streams");
+        producer.close();
         streams.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 11e1ae8..9193d1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
     // This main() is not used by the system test. It is intended to be used for local debugging.
     public static void main(String[] args) throws Exception {
         final String kafka = "localhost:9092";
-        final String zookeeper = "localhost:2181";
         final File stateDir = TestUtils.tempDirectory();
 
         final int numKeys = 20;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index b9288d7..a9537a7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -370,108 +370,109 @@ public class ClientCompatibilityTest {
         consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512);
         ClientCompatibilityTestDeserializer deserializer =
             new ClientCompatibilityTestDeserializer(testConfig.expectClusterId);
-        final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
-        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
-        if (partitionInfos.size() < 1)
-            throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
-        final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
-        final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
-            timestampsToSearch.put(topicPartition, prodTimeMs);
-            topicPartitions.add(topicPartition);
-        }
-        final OffsetsForTime offsetsForTime = new OffsetsForTime();
-        tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
-                new Invoker() {
-                    @Override
-                    public void invoke() {
-                        offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
-                    }
-                },
-                new ResultTester() {
-                    @Override
-                    public void test() {
-                        log.info("offsetsForTime = {}", offsetsForTime.result);
-                    }
-                });
-        // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
-        // should work.
-        consumer.beginningOffsets(timestampsToSearch.keySet());
-        consumer.endOffsets(timestampsToSearch.keySet());
-
-        consumer.assign(topicPartitions);
-        consumer.seekToBeginning(topicPartitions);
-        final Iterator<byte[]> iter = new Iterator<byte[]>() {
-            private static final int TIMEOUT_MS = 10000;
-            private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
-            private byte[] next = null;
-
-            private byte[] fetchNext() {
-                while (true) {
-                    long curTime = Time.SYSTEM.milliseconds();
-                    if (curTime - prodTimeMs > TIMEOUT_MS)
-                        throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms.");
-                    if (recordIter == null) {
-                        ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
-                        recordIter = records.iterator();
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer)) {
+            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
+            if (partitionInfos.size() < 1)
+                throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
+            final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+            final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+                timestampsToSearch.put(topicPartition, prodTimeMs);
+                topicPartitions.add(topicPartition);
+            }
+            final OffsetsForTime offsetsForTime = new OffsetsForTime();
+            tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
+                    new Invoker() {
+                        @Override
+                        public void invoke() {
+                            offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
+                        }
+                    },
+                    new ResultTester() {
+                        @Override
+                        public void test() {
+                            log.info("offsetsForTime = {}", offsetsForTime.result);
+                        }
+                    });
+            // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
+            // should work.
+            consumer.beginningOffsets(timestampsToSearch.keySet());
+            consumer.endOffsets(timestampsToSearch.keySet());
+
+            consumer.assign(topicPartitions);
+            consumer.seekToBeginning(topicPartitions);
+            final Iterator<byte[]> iter = new Iterator<byte[]>() {
+                private static final int TIMEOUT_MS = 10000;
+                private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
+                private byte[] next = null;
+
+                private byte[] fetchNext() {
+                    while (true) {
+                        long curTime = Time.SYSTEM.milliseconds();
+                        if (curTime - prodTimeMs > TIMEOUT_MS)
+                            throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms.");
+                        if (recordIter == null) {
+                            ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+                            recordIter = records.iterator();
+                        }
+                        if (recordIter.hasNext())
+                            return recordIter.next().value();
+                        recordIter = null;
                     }
-                    if (recordIter.hasNext())
-                        return recordIter.next().value();
-                    recordIter = null;
                 }
-            }
 
-            @Override
-            public boolean hasNext() {
-                if (next != null)
-                    return true;
-                next = fetchNext();
-                return next != null;
-            }
+                @Override
+                public boolean hasNext() {
+                    if (next != null)
+                        return true;
+                    next = fetchNext();
+                    return next != null;
+                }
 
-            @Override
-            public byte[] next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-                byte[] cur = next;
-                next = null;
-                return cur;
-            }
+                @Override
+                public byte[] next() {
+                    if (!hasNext())
+                        throw new NoSuchElementException();
+                    byte[] cur = next;
+                    next = null;
+                    return cur;
+                }
 
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-        byte[] next = iter.next();
-        try {
-            compareArrays(message1, next);
-            log.debug("Found first message...");
-        } catch (RuntimeException e) {
-            throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " +
-                    "running this program.");
-        }
-        try {
-            next = iter.next();
-            if (testConfig.expectRecordTooLargeException)
-                throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " +
-                        "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+            byte[] next = iter.next();
             try {
-                compareArrays(message2, next);
+                compareArrays(message1, next);
+                log.debug("Found first message...");
             } catch (RuntimeException e) {
-                System.out.println("The second message in this topic was not ours. Please use a new " +
-                    "topic when running this program.");
-                Exit.exit(1);
+                throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " +
+                        "running this program.");
+            }
+            try {
+                next = iter.next();
+                if (testConfig.expectRecordTooLargeException) {
+                    throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " +
+                            "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+                }
+                try {
+                    compareArrays(message2, next);
+                } catch (RuntimeException e) {
+                    System.out.println("The second message in this topic was not ours. Please use a new " +
+                        "topic when running this program.");
+                    Exit.exit(1);
+                }
+            } catch (RecordTooLargeException e) {
+                log.debug("Got RecordTooLargeException", e);
+                if (!testConfig.expectRecordTooLargeException)
+                    throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " +
+                        "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
             }
-        } catch (RecordTooLargeException e) {
-            log.debug("Got RecordTooLargeException", e);
-            if (!testConfig.expectRecordTooLargeException)
-                throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " +
-                    "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+            log.debug("Closing consumer.");
         }
-        log.debug("Closing consumer.");
-        consumer.close();
         log.info("Closed consumer.");
     }
 


[2/2] kafka git commit: MINOR: Code Cleanup

Posted by jg...@apache.org.
MINOR: Code Cleanup

Clean up includes:

- Switching try-catch-finally blocks to try-with-resources when possible
- Removing some seemingly unnecessary `SuppressWarnings` annotations
- Resolving some Java warnings
- Closing unclosed Closable objects
- Removing unused code

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Balint Molnar <ba...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3222 from vahidhashemian/minor/code_cleanup_1706


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f87d58b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f87d58b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f87d58b7

Branch: refs/heads/trunk
Commit: f87d58b796977fdaefb089d17cb30b2071cd4485
Parents: 3bfc073
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Jul 19 10:51:28 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jul 19 10:51:28 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   3 +-
 .../producer/internals/TransactionManager.java  |   1 -
 .../requests/OffsetsForLeaderEpochRequest.java  |   4 +-
 .../clients/consumer/KafkaConsumerTest.java     |  17 +-
 .../clients/producer/KafkaProducerTest.java     |  33 ++--
 .../clients/producer/MockProducerTest.java      |  71 ++++++-
 .../clients/producer/internals/SenderTest.java  |   9 +-
 .../utils/ByteBufferOutputStreamTest.java       |  10 +-
 .../connect/file/FileStreamSourceTaskTest.java  |   1 +
 .../connect/runtime/isolation/Plugins.java      |   1 -
 .../connect/storage/FileOffsetBackingStore.java |   4 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   1 -
 .../storage/KafkaConfigBackingStoreTest.java    |  17 +-
 .../kafka/connect/util/TopicAdminTest.java      |   1 -
 .../kafka/connect/transforms/CastTest.java      | 144 ++++++--------
 .../connect/transforms/ExtractFieldTest.java    |   9 +-
 .../kafka/connect/transforms/FlattenTest.java   |  54 +++---
 .../connect/transforms/HoistFieldTest.java      |   9 +-
 .../connect/transforms/InsertFieldTest.java     |  14 +-
 .../connect/transforms/RegexRouterTest.java     |   5 +-
 .../connect/transforms/ReplaceFieldTest.java    |  11 +-
 .../transforms/SetSchemaMetadataTest.java       |  11 +-
 .../transforms/TimestampConverterTest.java      | 128 ++++++-------
 .../connect/transforms/TimestampRouterTest.java |   8 +-
 .../connect/transforms/ValueToKeyTest.java      |   9 +-
 .../apache/kafka/streams/kstream/KStream.java   |   1 -
 .../kafka/streams/kstream/KStreamBuilder.java   |   6 +-
 .../kstream/internals/KGroupedStreamImpl.java   |   7 -
 .../kstream/internals/KTableKTableJoin.java     |   1 -
 .../kstream/internals/KTableKTableLeftJoin.java |   1 -
 .../internals/KTableKTableOuterJoin.java        |   1 -
 .../internals/KTableKTableRightJoin.java        |   1 -
 .../internals/GlobalStateUpdateTask.java        |   1 -
 .../processor/internals/StreamThread.java       |   1 -
 .../internals/assignment/AssignmentInfo.java    |   4 +-
 .../apache/kafka/streams/state/StateSerdes.java |   1 -
 .../state/internals/CachingKeyValueStore.java   |   1 -
 .../state/internals/CachingWindowStore.java     |   1 -
 .../ChangeLoggingSegmentedBytesStore.java       |   1 -
 .../streams/state/internals/RocksDBStore.java   |   1 -
 .../streams/kstream/KStreamBuilderTest.java     |  10 +-
 ...reamSessionWindowAggregateProcessorTest.java |   2 -
 .../WindowedStreamPartitionerTest.java          |   6 +
 .../streams/processor/TopologyBuilderTest.java  |  14 +-
 .../processor/internals/QuickUnionTest.java     |   1 -
 .../processor/internals/StandbyTaskTest.java    |   2 -
 .../internals/StreamPartitionAssignorTest.java  |   1 -
 .../processor/internals/StreamTaskTest.java     |   1 -
 .../streams/state/KeyValueStoreTestDriver.java  |   1 -
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   3 -
 .../CompositeReadOnlyKeyValueStoreTest.java     |   4 -
 .../DelegatingPeekingKeyValueIteratorTest.java  |   5 +
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   2 +
 ...rtedCacheWrappedWindowStoreIteratorTest.java |   3 +
 .../MeteredSegmentedBytesStoreTest.java         |   1 -
 .../RocksDBKeyValueStoreSupplierTest.java       |   2 -
 .../RocksDBSessionStoreSupplierTest.java        |   3 -
 .../RocksDBWindowStoreSupplierTest.java         |   3 -
 .../state/internals/StoreChangeLoggerTest.java  |   2 -
 .../streams/tests/BrokerCompatibilityTest.java  |   1 +
 .../kafka/streams/tests/SmokeTestDriver.java    |   1 -
 .../kafka/tools/ClientCompatibilityTest.java    | 187 ++++++++++---------
 62 files changed, 419 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c54b739..5f854d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -302,7 +302,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 keySerializer, valueSerializer);
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         try {
             log.trace("Starting the Kafka producer");
@@ -339,7 +339,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = ensureExtended(valueSerializer);
             }
-            
 
             // load interceptors and make sure they get clientId
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 8cee794..c5542e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -628,7 +628,6 @@ public class TransactionManager {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void onComplete(ClientResponse response) {
             if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
                 fatalError(new RuntimeException("Detected more than one in-flight transactional request."));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index f898a75..fc31d75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -42,7 +42,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     }
 
     public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
-        private Map<TopicPartition, Integer> epochsByPartition = new HashMap();
+        private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>();
 
         public Builder() {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
@@ -129,7 +129,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
-        Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
+        Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
         for (TopicPartition tp : epochsByPartition.keySet()) {
             errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 219c3f6..9fd7e19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -129,13 +129,12 @@ public class KafkaConsumerTest {
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
             new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+            Assert.fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
             assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
             assertEquals("Failed to construct kafka consumer", e.getMessage());
-            return;
         }
-        Assert.fail("should have caught an exception and returned");
     }
 
     @Test
@@ -1191,23 +1190,17 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptySubscription() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        consumer.subscribe(Collections.<String>emptyList());
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.subscribe(Collections.<String>emptyList());
             consumer.poll(0);
-        } finally {
-            consumer.close();
         }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptyUserAssignment() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        consumer.assign(Collections.<TopicPartition>emptySet());
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(Collections.<TopicPartition>emptySet());
             consumer.poll(0);
-        } finally {
-            consumer.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 56c1b18..dd62457 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -87,16 +87,13 @@ public class KafkaProducerTest {
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
-        try {
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
-                    props, new ByteArraySerializer(), new ByteArraySerializer());
+        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+            fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
             assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
             assertEquals("Failed to construct kafka producer", e.getMessage());
-            return;
         }
-        fail("should have caught an exception and returned");
     }
 
     @Test
@@ -172,9 +169,7 @@ public class KafkaProducerTest {
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
         config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
-                config, new ByteArraySerializer(), new ByteArraySerializer());
-        producer.close();
+        new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()).close();
     }
 
     @Test(expected = KafkaException.class)
@@ -355,7 +350,7 @@ public class KafkaProducerTest {
         }
         Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
     }
-    
+
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testHeaders() throws Exception {
@@ -369,8 +364,6 @@ public class KafkaProducerTest {
         MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
 
         String topic = "topic";
-        Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
-
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
@@ -395,9 +388,9 @@ public class KafkaProducerTest {
 
         //ensure headers can be mutated pre send.
         record.headers().add(new RecordHeader("test", "header2".getBytes()));
-        
+
         producer.send(record, null);
-        
+
         //ensure headers are closed and cannot be mutated post send
         try {
             record.headers().add(new RecordHeader("test", "test".getBytes()));
@@ -405,7 +398,7 @@ public class KafkaProducerTest {
         } catch (IllegalStateException ise) {
             //expected
         }
-        
+
         //ensure existing headers are not changed, and last header for key is still original value
         assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
 
@@ -436,7 +429,7 @@ public class KafkaProducerTest {
             assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
         }
     }
-    
+
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
@@ -445,7 +438,7 @@ public class KafkaProducerTest {
         props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
-        
+
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(),
                 new StringSerializer());
         Metadata metadata = PowerMock.createNiceMock(Metadata.class);
@@ -457,20 +450,18 @@ public class KafkaProducerTest {
             Collections.<String>emptySet(),
             Collections.<String>emptySet());
         EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        
+
         // Mock interceptors field
         ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class);
         EasyMock.expect(interceptors.onSend(record)).andReturn(record);
         interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(), EasyMock.<Exception>notNull());
         EasyMock.expectLastCall();
         MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors);
-        
+
         PowerMock.replay(metadata);
         EasyMock.replay(interceptors);
         producer.send(record);
-        
+
         EasyMock.verify(interceptors);
-        
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index eeb9b5f..d343194 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockSerializer;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -47,14 +48,23 @@ import static org.junit.Assert.fail;
 public class MockProducerTest {
 
     private final String topic = "topic";
-    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
+    private MockProducer<byte[], byte[]> producer;
     private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
     private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
 
+    private void buildMockProducer(boolean autoComplete) {
+        this.producer = new MockProducer<>(autoComplete, new MockSerializer(), new MockSerializer());
+    }
+
+    @After
+    public void cleanup() {
+        if (this.producer != null && !this.producer.closed())
+            this.producer.close();
+    }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
+        buildMockProducer(true);
         Future<RecordMetadata> metadata = producer.send(record1);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -77,11 +87,12 @@ public class MockProducerTest {
         assertEquals("Partition should be correct", 1, metadata.get().partition());
         producer.clear();
         assertEquals("Clear should erase our history", 0, producer.history().size());
+        producer.close();
     }
 
     @Test
     public void testManualCompletion() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);
@@ -98,7 +109,7 @@ public class MockProducerTest {
             assertEquals(e, err.getCause());
         }
         assertFalse("No more requests to complete", producer.completeNext());
-        
+
         Future<RecordMetadata> md3 = producer.send(record1);
         Future<RecordMetadata> md4 = producer.send(record2);
         assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone());
@@ -108,12 +119,14 @@ public class MockProducerTest {
 
     @Test
     public void shouldInitTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         assertTrue(producer.transactionInitialized());
     }
 
     @Test
     public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.initTransactions();
@@ -123,11 +136,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.beginTransaction();
     }
 
     @Test
     public void shouldBeginTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         assertTrue(producer.transactionInFlight());
@@ -135,11 +150,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.sendOffsetsToTransaction(null, null);
     }
 
     @Test
     public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.sendOffsetsToTransaction(null, null);
@@ -149,11 +166,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnCommitIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.commitTransaction();
     }
 
     @Test
     public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.commitTransaction();
@@ -163,6 +182,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldCommitEmptyTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.commitTransaction();
@@ -173,6 +193,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldCountCommittedTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -183,6 +204,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldNotCountAbortedTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -195,11 +217,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnAbortIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.abortTransaction();
     }
 
     @Test
     public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.abortTransaction();
@@ -209,6 +233,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAbortEmptyTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.abortTransaction();
@@ -219,6 +244,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAbortInFlightTransactionOnClose() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.close();
@@ -229,11 +255,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.fenceProducer();
     }
 
     @Test
     public void shouldThrowOnBeginTransactionsIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -244,6 +272,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -254,6 +283,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFlushIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -264,6 +294,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -274,6 +305,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCommitTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -284,6 +316,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -294,6 +327,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -313,7 +347,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -331,6 +365,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -346,7 +381,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -359,6 +394,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -378,6 +414,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -410,6 +447,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -421,6 +459,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), "groupId");
@@ -429,6 +468,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAddOffsetsWhenSendOffsetsToTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -445,6 +485,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -465,6 +506,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -501,6 +543,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -521,6 +564,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -545,6 +589,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnInitTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.initTransactions();
@@ -554,6 +599,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.send(null);
@@ -563,6 +609,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnBeginTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.beginTransaction();
@@ -572,6 +619,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.sendOffsetsToTransaction(null, null);
@@ -581,6 +629,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCommitTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.commitTransaction();
@@ -590,6 +639,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.abortTransaction();
@@ -599,6 +649,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCloseIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.close();
@@ -608,6 +659,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFenceProducerIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.fenceProducer();
@@ -617,6 +669,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFlushProducerIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.flush();
@@ -626,25 +679,27 @@ public class MockProducerTest {
 
     @Test
     public void shouldBeFlushedIfNoBufferedRecords() {
+        buildMockProducer(true);
         assertTrue(producer.flushed());
     }
 
     @Test
     public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() {
+        buildMockProducer(true);
         producer.send(record1);
         assertTrue(producer.flushed());
     }
 
     @Test
     public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.send(record1);
         assertFalse(producer.flushed());
     }
 
     @Test
     public void shouldNotBeFlushedAfterFlush() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.send(record1);
         producer.flush();
         assertTrue(producer.flushed());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0537a35..d587de4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -633,10 +633,9 @@ public class SenderTest {
         String topic = tp.topic();
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
-        Metrics m = new Metrics();
-        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
-                new ApiVersions(), txnManager);
-        try {
+        try (Metrics m = new Metrics()) {
+            accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+                    new ApiVersions(), txnManager);
             Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
@@ -701,8 +700,6 @@ public class SenderTest {
 
             assertTrue("There should be a split",
                     m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
-        } finally {
-            m.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
index fbac719..26ba3b8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -49,6 +50,7 @@ public class ByteBufferOutputStreamTest {
         byte[] bytes = new byte[5];
         buffer.get(bytes);
         assertArrayEquals("hello".getBytes(), bytes);
+        output.close();
     }
 
     @Test
@@ -75,19 +77,20 @@ public class ByteBufferOutputStreamTest {
         byte[] bytes = new byte[5];
         buffer.get(bytes);
         assertArrayEquals("hello".getBytes(), bytes);
+        output.close();
     }
 
     @Test
-    public void testWriteByteBuffer() {
+    public void testWriteByteBuffer() throws IOException {
         testWriteByteBuffer(ByteBuffer.allocate(16));
     }
 
     @Test
-    public void testWriteDirectByteBuffer() {
+    public void testWriteDirectByteBuffer() throws IOException {
         testWriteByteBuffer(ByteBuffer.allocateDirect(16));
     }
 
-    private void testWriteByteBuffer(ByteBuffer input) {
+    private void testWriteByteBuffer(ByteBuffer input) throws IOException {
         long value = 234239230L;
         input.putLong(value);
         input.flip();
@@ -97,6 +100,7 @@ public class ByteBufferOutputStreamTest {
         assertEquals(8, input.position());
         assertEquals(8, output.position());
         assertEquals(value, output.buffer().getLong(0));
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index eb91dbd..03fb774 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -123,6 +123,7 @@ public class FileStreamSourceTaskTest {
         assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
         assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
 
+        os.close();
         task.stop();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 654f485..f7a5553 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -137,7 +137,6 @@ public class Plugins {
         return delegatingLoader.transformations();
     }
 
-    @SuppressWarnings("unchecked")
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 32f5a38..0547fe6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -69,8 +69,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
 
     @SuppressWarnings("unchecked")
     private void load() {
-        try {
-            ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
+        try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(file))) {
             Object obj = is.readObject();
             if (!(obj instanceof HashMap))
                 throw new ConnectException("Expected HashMap but found " + obj.getClass());
@@ -81,7 +80,6 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
                 ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
                 data.put(key, value);
             }
-            is.close();
         } catch (FileNotFoundException | EOFException e) {
             // FileNotFoundException: Ignore, may be new.
             // EOFException: Ignore, this means the file was missing or corrupt

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 29a6b52..6f77f65 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -115,7 +115,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private long recordsReturned;
 
 
-    @SuppressWarnings("unchecked")
     @Override
     public void setup() {
         super.setup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 07d192b..e9dd18e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -135,7 +135,6 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     private KafkaConfigBackingStore configStorage;
 
-    private String internalTopic;
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
@@ -363,7 +362,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -403,7 +402,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -445,7 +444,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -494,7 +493,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -540,7 +539,7 @@ public class KafkaConfigBackingStoreTest {
                 // Connector after root update should make it through, task update shouldn't
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
                 new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -594,7 +593,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
 
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -640,7 +639,7 @@ public class KafkaConfigBackingStoreTest {
             new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
             new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
             new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -691,7 +690,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index bafbce8..f90b77f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -49,7 +49,6 @@ public class TopicAdminTest {
     @Test
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
-        boolean internal = false;
         Cluster cluster = createCluster(1);
         try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
             env.kafkaClient().setNode(cluster.controller());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index 88afafc..b190189 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -34,42 +35,44 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class CastTest {
+    private final Cast<SourceRecord> xformKey = new Cast.Key<>();
+    private final Cast<SourceRecord> xformValue = new Cast.Value<>();
+
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
 
     @Test(expected = ConfigException.class)
     public void testConfigEmpty() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidSchemaType() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidTargetType() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidMap() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigMixWholeAndFieldTransformation() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"));
     }
 
     @Test
     public void castWholeRecordKeyWithSchema() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus"));
 
         assertEquals(Schema.Type.INT8, transformed.keySchema().type());
@@ -78,9 +81,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt8() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
@@ -89,9 +91,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt16() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT16, transformed.valueSchema().type());
@@ -100,9 +101,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -111,9 +111,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT64, transformed.valueSchema().type());
@@ -122,9 +121,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.FLOAT32, transformed.valueSchema().type());
@@ -133,9 +131,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.FLOAT64, transformed.valueSchema().type());
@@ -144,9 +141,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanTrue() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -155,9 +151,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanFalse() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 0));
 
         assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -166,9 +161,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaString() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
@@ -178,9 +172,8 @@ public class CastTest {
     @Test
     public void castWholeRecordDefaultValue() {
         // Validate default value in schema is correctly converted
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 SchemaBuilder.float32().defaultValue(-42.125f).build(), 42.125f));
 
         assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -190,9 +183,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordKeySchemaless() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42, Schema.STRING_SCHEMA, "bogus"));
 
         assertNull(transformed.keySchema());
@@ -201,9 +193,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt8() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -212,9 +203,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt16() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -223,9 +213,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -234,9 +223,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -245,9 +233,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -256,9 +243,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -267,9 +253,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanTrue() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -278,9 +263,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanFalse() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 0));
 
         assertNull(transformed.valueSchema());
@@ -289,9 +273,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessString() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -300,16 +283,14 @@ public class CastTest {
 
     @Test(expected = DataException.class)
     public void castWholeRecordValueSchemalessUnsupportedType() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo")));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo")));
     }
 
 
     @Test
     public void castFieldsWithSchema() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
 
         // Include an optional fields and fields with defaults to validate their values are passed through properly
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -336,7 +317,7 @@ public class CastTest {
         recordValue.put("string", "42");
         // optional field intentionally omitted
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 supportedTypesSchema, recordValue));
 
         assertEquals((short) 8, ((Struct) transformed.value()).get("int8"));
@@ -356,8 +337,7 @@ public class CastTest {
     @SuppressWarnings("unchecked")
     @Test
     public void castFieldsSchemaless() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
         Map<String, Object> recordValue = new HashMap<>();
         recordValue.put("int8", (byte) 8);
         recordValue.put("int16", (short) 16);
@@ -367,7 +347,7 @@ public class CastTest {
         recordValue.put("float64", -64.);
         recordValue.put("boolean", true);
         recordValue.put("string", "42");
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
                 null, recordValue));
 
         assertNull(transformed.valueSchema());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index b54a908..0b7ce96 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -28,10 +29,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class ExtractFieldTest {
+    private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
@@ -43,7 +49,6 @@ public class ExtractFieldTest {
 
     @Test
     public void withSchema() {
-        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 86851f3..d709054 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -36,25 +37,30 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FlattenTest {
+    private final Flatten<SourceRecord> xformKey = new Flatten.Key<>();
+    private final Flatten<SourceRecord> xformValue = new Flatten.Value<>();
+
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
 
     @Test(expected = DataException.class)
     public void topLevelStructRequired() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
-        xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42));
+        xformValue.configure(Collections.<String, String>emptyMap());
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42));
     }
 
     @Test(expected = DataException.class)
     public void topLevelMapRequired() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, 42));
+        xformValue.configure(Collections.<String, String>emptyMap());
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42));
     }
 
     @Test
     public void testNestedStruct() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("int8", Schema.INT8_SCHEMA);
@@ -93,7 +99,7 @@ public class FlattenTest {
         Struct twoLevelNestedStruct = new Struct(twoLevelNestedSchema);
         twoLevelNestedStruct.put("A", oneLevelNestedStruct);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
                 "topic", 0,
                 twoLevelNestedSchema, twoLevelNestedStruct));
 
@@ -113,8 +119,7 @@ public class FlattenTest {
 
     @Test
     public void testNestedMapWithDelimiter() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.singletonMap("delimiter", "#"));
+        xformValue.configure(Collections.singletonMap("delimiter", "#"));
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("int8", (byte) 8);
@@ -130,7 +135,7 @@ public class FlattenTest {
         Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes);
         Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", (Object) oneLevelNestedMap);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
                 "topic", 0,
                 null, twoLevelNestedMap));
 
@@ -151,8 +156,7 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldStruct() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -168,7 +172,7 @@ public class FlattenTest {
         Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
         oneLevelNestedStruct.put("B", supportedTypes);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
                 "topic", 0,
                 oneLevelNestedSchema, oneLevelNestedStruct));
 
@@ -179,15 +183,14 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldMap() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("opt_int32", null);
 
         Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
                 "topic", 0,
                 null, oneLevelNestedMap));
 
@@ -200,12 +203,11 @@ public class FlattenTest {
 
     @Test
     public void testKey() {
-        final Flatten<SourceRecord> xform = new Flatten.Key<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformKey.configure(Collections.<String, String>emptyMap());
 
         Map<String, Map<String, Integer>> key = Collections.singletonMap("A", Collections.singletonMap("B", 12));
         SourceRecord src = new SourceRecord(null, null, "topic", null, key, null, null);
-        SourceRecord transformed = xform.apply(src);
+        SourceRecord transformed = xformKey.apply(src);
 
         assertNull(transformed.keySchema());
         assertTrue(transformed.key() instanceof Map);
@@ -215,10 +217,9 @@ public class FlattenTest {
 
     @Test(expected = DataException.class)
     public void testUnsupportedTypeInMap() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
         Object value = Collections.singletonMap("foo", Arrays.asList("bar", "baz"));
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, value));
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value));
     }
 
     @Test
@@ -227,8 +228,7 @@ public class FlattenTest {
         // children should also be optional. Similarly, if the parent Struct has a default value, the default value for
         // the flattened field
 
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct().optional();
         builder.field("req_field", Schema.STRING_SCHEMA);
@@ -240,7 +240,7 @@ public class FlattenTest {
         // Intentionally leave this entire value empty since it is optional
         Struct value = new Struct(schema);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, schema, value));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, schema, value));
 
         assertNotNull(transformed);
         Schema transformedSchema = transformed.valueSchema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
index 299aab3..1135b85 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -27,10 +28,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class HoistFieldTest {
+    private final HoistField<SinkRecord> xform = new HoistField.Key<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final HoistField<SinkRecord> xform = new HoistField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0);
@@ -42,7 +48,6 @@ public class HoistFieldTest {
 
     @Test
     public void withSchema() {
-        final HoistField<SinkRecord> xform = new HoistField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index 4ce6ad4..a0a0975 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -32,14 +33,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
 public class InsertFieldTest {
+    private InsertField<SourceRecord> xform = new InsertField.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test(expected = DataException.class)
     public void topLevelStructRequired() {
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(Collections.singletonMap("topic.field", "topic_field"));
-        xform.apply(new SourceRecord(null, null,
-                "", 0,
-                Schema.INT32_SCHEMA, 42));
+        xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
     }
 
     @Test
@@ -51,7 +55,6 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(props);
 
         final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
@@ -94,7 +97,6 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
index cc001f1..aa47001 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
@@ -32,8 +32,9 @@ public class RegexRouterTest {
         props.put("replacement", replacement);
         final RegexRouter<SinkRecord> router = new RegexRouter<>();
         router.configure(props);
-        return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0))
-                .topic();
+        String sinkTopic = router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)).topic();
+        router.close();
+        return sinkTopic;
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index e3d9d3a..6a1a13a 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -28,11 +29,15 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 
 public class ReplaceFieldTest {
+    private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
         final Map<String, String> props = new HashMap<>();
         props.put("blacklist", "dont");
         props.put("renames", "abc:xyz,foo:bar");
@@ -57,8 +62,6 @@ public class ReplaceFieldTest {
 
     @Test
     public void withSchema() {
-        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
         final Map<String, String> props = new HashMap<>();
         props.put("whitelist", "abc,foo");
         props.put("renames", "abc:xyz,foo:bar");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 206c51e..257b382 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -31,10 +32,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
 public class SetSchemaMetadataTest {
+    private final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaNameUpdate() {
-        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
         xform.configure(Collections.singletonMap("schema.name", "foo"));
         final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
@@ -43,7 +49,6 @@ public class SetSchemaMetadataTest {
 
     @Test
     public void schemaVersionUpdate() {
-        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
         xform.configure(Collections.singletonMap("schema.version", 42));
         final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
@@ -56,7 +61,6 @@ public class SetSchemaMetadataTest {
         props.put("schema.name", "foo");
         props.put("schema.version", "42");
 
-        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
         xform.configure(props);
 
         final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
@@ -83,7 +87,6 @@ public class SetSchemaMetadataTest {
         final Map<String, String> props = new HashMap<>();
         props.put("schema.name", "foo");
         props.put("schema.version", "42");
-        final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
         xform.configure(props);
 
         final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);