You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/19 17:53:34 UTC
[1/2] kafka git commit: MINOR: Code Cleanup
Repository: kafka
Updated Branches:
refs/heads/trunk 3bfc073f0 -> f87d58b79
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 12beef8..475066f 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Calendar;
@@ -38,7 +39,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TimestampConverterTest {
-
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private static final Calendar EPOCH;
private static final Calendar TIME;
@@ -48,6 +48,9 @@ public class TimestampConverterTest {
private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
private static final String DATE_PLUS_TIME_STRING;
+ private final TimestampConverter<SourceRecord> xformKey = new TimestampConverter.Key<>();
+ private final TimestampConverter<SourceRecord> xformValue = new TimestampConverter.Value<>();
+
static {
EPOCH = GregorianCalendar.getInstance(UTC);
EPOCH.setTimeInMillis(0L);
@@ -73,31 +76,33 @@ public class TimestampConverterTest {
// Configuration
+ @After
+ public void teardown() {
+ xformKey.close();
+ xformValue.close();
+ }
+
@Test(expected = ConfigException.class)
public void testConfigNoTargetType() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
}
@Test(expected = ConfigException.class)
public void testConfigInvalidTargetType() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
}
@Test(expected = ConfigException.class)
public void testConfigMissingFormat() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidFormat() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
- xform.configure(config);
+ xformValue.configure(config);
}
@@ -105,9 +110,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessIdentity() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -115,9 +119,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToDate() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
@@ -125,9 +128,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToTime() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
@@ -135,9 +137,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToUnix() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -145,12 +146,11 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToString() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
- xform.configure(config);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(config);
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -161,9 +161,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessDateToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
@@ -172,9 +171,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimeToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
@@ -183,9 +181,8 @@ public class TimestampConverterTest {
@Test
public void testSchemalessUnixToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -193,12 +190,11 @@ public class TimestampConverterTest {
@Test
public void testSchemalessStringToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
- xform.configure(config);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
+ xformValue.configure(config);
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -209,9 +205,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaIdentity() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -219,9 +214,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToDate() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Date.SCHEMA, transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
@@ -229,9 +223,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToTime() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Time.SCHEMA, transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
@@ -239,9 +232,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToUnix() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -249,12 +241,11 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToString() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
- xform.configure(config);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+ xformValue.configure(config);
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -265,9 +256,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaDateToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
@@ -276,9 +266,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimeToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
@@ -287,9 +276,8 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaUnixToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
+ xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -297,12 +285,11 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaStringToTimestamp() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
- xform.configure(config);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
+ xformValue.configure(config);
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -313,14 +300,13 @@ public class TimestampConverterTest {
@Test
public void testSchemalessFieldConversion() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
- xform.configure(config);
+ xformValue.configure(config);
Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, value));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value));
assertNull(transformed.valueSchema());
assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
@@ -328,11 +314,10 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaFieldConversion() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>();
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
- xform.configure(config);
+ xformValue.configure(config);
// ts field is a unix timestamp
Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
@@ -343,7 +328,7 @@ public class TimestampConverterTest {
original.put("ts", DATE_PLUS_TIME_UNIX);
original.put("other", "test");
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original));
Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
@@ -359,9 +344,8 @@ public class TimestampConverterTest {
@Test
public void testKey() {
- TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>();
- xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
+ xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
+ SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
assertNull(transformed.keySchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
index 595a71c..ba823ba 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -24,10 +25,15 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals;
public class TimestampRouterTest {
+ private final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void defaultConfiguration() {
- final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
xform.configure(Collections.<String, Object>emptyMap()); // defaults
final SourceRecord record = new SourceRecord(
null, null,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index 69fb026..e2dfa17 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -29,10 +30,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ValueToKeyTest {
+ private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void schemaless() {
- final ValueToKey<SinkRecord> xform = new ValueToKey<>();
xform.configure(Collections.singletonMap("fields", "a,b"));
final HashMap<String, Integer> value = new HashMap<>();
@@ -53,7 +59,6 @@ public class ValueToKeyTest {
@Test
public void withSchema() {
- final ValueToKey<SinkRecord> xform = new ValueToKey<>();
xform.configure(Collections.singletonMap("fields", "a,b"));
final Schema valueSchema = SchemaBuilder.struct()
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ca0e916..9637927 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -53,7 +53,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
* @see KGroupedStream
* @see KStreamBuilder#stream(String...)
*/
-@SuppressWarnings("unused")
@InterfaceStability.Evolving
public interface KStream<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b941f78..46769eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1039,7 +1039,6 @@ public class KStreamBuilder extends TopologyBuilder {
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
* @return a {@link GlobalKTable} for the specified topic
*/
- @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
@@ -1083,14 +1082,13 @@ public class KStreamBuilder extends TopologyBuilder {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link GlobalKTable} for the specified topic
*/
- @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
}
-
+
/**
* Create a {@link GlobalKTable} for the specified topic.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
@@ -1121,7 +1119,6 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
* @return a {@link GlobalKTable} for the specified topic
*/
- @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
@@ -1198,7 +1195,6 @@ public class KStreamBuilder extends TopologyBuilder {
* @param topic the topic name; cannot be {@code null}
* @return a {@link GlobalKTable} for the specified topic
*/
- @SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 6ed3e84..a1b40a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -92,7 +92,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
- @SuppressWarnings("unchecked")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
@@ -101,7 +100,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
}
- @SuppressWarnings("unchecked")
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows) {
@@ -152,7 +150,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
- @SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -163,7 +160,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
}
- @SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -266,7 +262,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
- @SuppressWarnings("unchecked")
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -309,7 +304,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return count(sessionWindows, (String) null);
}
- @SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore> storeSupplier) {
@@ -350,7 +344,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
.sessionWindowed(sessionWindows.maintainMs()).build());
}
- @SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index d30177c..1b26a5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -67,7 +67,6 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
this.valueGetter = valueGetter;
}
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 8dc330d..c308a0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -57,7 +57,6 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
this.valueGetter = valueGetter;
}
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 90a9f77..9cee4f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -56,7 +56,6 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
this.valueGetter = valueGetter;
}
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 05ecf40..b43efaa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -57,7 +57,6 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
this.valueGetter = valueGetter;
}
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 38beb63..4c2b40f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -49,7 +49,6 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
- @SuppressWarnings("unchecked")
public Map<TopicPartition, Long> initialize() {
final Set<String> storeNames = stateMgr.initialize(processorContext);
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9dc5640..eb75b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1135,7 +1135,6 @@ public class StreamThread extends Thread {
streamsMetrics.removeAllSensors();
}
- @SuppressWarnings("ThrowableNotThrown")
private void shutdownTasksAndState(final boolean cleanRun) {
log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 77fb58a..8607472 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -120,9 +120,8 @@ public class AssignmentInfo {
public static AssignmentInfo decode(ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
- DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
- try {
+ try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
// Decode version
int version = in.readInt();
if (version < 0 || version > CURRENT_VERSION) {
@@ -156,7 +155,6 @@ public class AssignmentInfo {
return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index d43c613..2a54cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -61,7 +61,6 @@ public final class StateSerdes<K, V> {
* @param valueSerde the serde for values; cannot be null
* @throws IllegalArgumentException if key or value serde is null
*/
- @SuppressWarnings("unchecked")
public StateSerdes(final String topic,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 6190b88..b7d41b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -53,7 +53,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
this.valueSerde = valueSerde;
}
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
underlying.init(context, root);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 9a4a97c..b786ce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -63,7 +63,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval);
}
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
underlying.init(context, root);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 9a826c4..34fe8f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -67,7 +67,6 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt
@Override
- @SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
bytesStore.init(context, root);
changeLogger = new StoreChangeLogger<>(
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7034592..4d93a9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -239,7 +239,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
- @SuppressWarnings("unchecked")
@Override
public synchronized void put(K key, V value) {
Objects.requireNonNull(key, "key cannot be null");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 82e5b23..75b7910 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -318,7 +318,7 @@ public class KStreamBuilderTest {
final String topicName = "topic-1";
builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
-
+
assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
}
@@ -373,7 +373,7 @@ public class KStreamBuilderTest {
final String topic = "topic-5";
builder.stream(topicPattern);
-
+
assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
@@ -401,7 +401,6 @@ public class KStreamBuilderTest {
assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
}
- @SuppressWarnings("unchecked")
@Test
public void kStreamTimestampExtractorShouldBeNull() throws Exception {
builder.stream("topic");
@@ -409,7 +408,6 @@ public class KStreamBuilderTest {
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
builder.stream(new MockTimestampExtractor(), null, null, "topic");
@@ -419,7 +417,6 @@ public class KStreamBuilderTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
@@ -427,7 +424,6 @@ public class KStreamBuilderTest {
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
builder.table("topic", "store");
@@ -435,7 +431,6 @@ public class KStreamBuilderTest {
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
- @SuppressWarnings("unchecked")
@Test
public void kTableTimestampExtractorShouldBeNull() throws Exception {
builder.table("topic", "store");
@@ -443,7 +438,6 @@ public class KStreamBuilderTest {
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c69cd70..0662944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("unchecked")
public class KStreamSessionWindowAggregateProcessorTest {
private static final long GAP_MS = 5 * 60 * 1000L;
@@ -84,7 +83,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
private MockProcessorContext context;
- @SuppressWarnings("unchecked")
@Before
public void initializeStore() {
final File stateDir = TestUtils.tempDirectory();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 316494d..98bb346 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -89,6 +89,8 @@ public class WindowedStreamPartitionerTest {
assertEquals(expected, actual);
}
}
+
+ defaultPartitioner.close();
}
@Test
@@ -113,6 +115,8 @@ public class WindowedStreamPartitionerTest {
Serializer<?> inner1 = windowedSerializer1.innerSerializer();
assertNotNull("Inner serializer should be not null", inner1);
assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
+ windowedSerializer.close();
+ windowedSerializer1.close();
}
@Test
@@ -137,5 +141,7 @@ public class WindowedStreamPartitionerTest {
Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
assertNotNull("Inner deserializer should be not null", inner1);
assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
+ windowedDeserializer.close();
+ windowedDeserializer1.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index c0ce9e7..bad193a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -78,7 +78,7 @@ public class TopologyBuilderTest {
@Test
public void shouldAddSourcePatternWithOffsetReset() {
final TopologyBuilder builder = new TopologyBuilder();
-
+
final String earliestTopicPattern = "earliest.*Topic";
final String latestTopicPattern = "latest.*Topic";
@@ -107,7 +107,7 @@ public class TopologyBuilderTest {
final TopologyBuilder builder = new TopologyBuilder();
final Serde<String> stringSerde = Serdes.String();
final Pattern expectedPattern = Pattern.compile("test-.*");
-
+
builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
@@ -143,7 +143,7 @@ public class TopologyBuilderTest {
}
-
+
@Test(expected = TopologyBuilderException.class)
public void testAddSourceWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -579,7 +579,6 @@ public class TopologyBuilderTest {
assertEquals(2, properties.size());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -596,7 +595,6 @@ public class TopologyBuilderTest {
assertEquals(1, properties.size());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -702,7 +700,6 @@ public class TopologyBuilderTest {
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -711,7 +708,6 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -720,7 +716,6 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -730,7 +725,6 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -740,7 +734,6 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
@@ -749,7 +742,6 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
index 56e2410..e6cca87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotEquals;
public class QuickUnionTest {
- @SuppressWarnings("unchecked")
@Test
public void testUnite() {
QuickUnion<Long> qu = new QuickUnion<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index a358be5..25c3cbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -177,7 +177,6 @@ public class StandbyTaskTest {
}
- @SuppressWarnings("unchecked")
@Test
public void testUpdate() throws Exception {
StreamsConfig config = createConfig(baseDir);
@@ -224,7 +223,6 @@ public class StandbyTaskTest {
}
- @SuppressWarnings("unchecked")
@Test
public void testUpdateKTable() throws Exception {
consumer.assign(Utils.mkList(ktable));
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17eb50a..a6d1179 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -131,7 +131,6 @@ public class StreamPartitionAssignorTest {
partitionAssignor.configure(configurationMap);
}
- @SuppressWarnings("unchecked")
@Test
public void testSubscription() throws Exception {
builder.addSource("source1", "topic1");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1a0bebe..a27fb62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -724,7 +724,6 @@ public class StreamTaskTest {
});
}
- @SuppressWarnings("unchecked")
@Test
public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
task.close(true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b4598fd..c6d12c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -185,7 +185,6 @@ public class KeyValueStoreTestDriver<K, V> {
final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
- @SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(final String topic,
final K1 key,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
index a77d4ac..ff7cdc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -61,7 +61,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore(bytesStore);
private final Map sent = new HashMap<>();
- @SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
context.setTime(0);
@@ -74,7 +73,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
store.close();
}
- @SuppressWarnings("unchecked")
@Test
public void shouldLogPuts() throws Exception {
final byte[] value1 = {0};
@@ -88,7 +86,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
assertArrayEquals(value2, (byte[]) sent.get(key2));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldLogRemoves() throws Exception {
final Bytes key1 = Bytes.wrap(new byte[]{0});
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4054990..0fa5216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -46,7 +46,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
private KeyValueStore<String, String>
otherUnderlyingStore;
- @SuppressWarnings("unchecked")
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
@@ -141,8 +140,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
} catch (UnsupportedOperationException e) { }
}
-
- @SuppressWarnings("unchecked")
@Test
public void shouldFindValueForKeyWhenMultiStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
@@ -167,7 +164,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
assertEquals(2, results.size());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldSupportRangeAcrossMultipleKVStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index bc21a7a..4baecb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -43,6 +43,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
assertEquals("A", peekingIterator.peekNextKey());
assertEquals("A", peekingIterator.peekNextKey());
assertTrue(peekingIterator.hasNext());
+ peekingIterator.close();
}
@Test
@@ -52,6 +53,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
assertTrue(peekingIterator.hasNext());
+ peekingIterator.close();
}
@Test
@@ -71,18 +73,21 @@ public class DelegatingPeekingKeyValueIteratorTest {
index++;
}
assertEquals(kvs.length, index);
+ peekingIterator.close();
}
@Test(expected = NoSuchElementException.class)
public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception {
final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
peekingIterator.next();
+ peekingIterator.close();
}
@Test(expected = NoSuchElementException.class)
public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception {
final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
peekingIterator.peekNextKey();
+ peekingIterator.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 6e0059f..89a4d63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -64,6 +64,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
values[index++] = value;
assertArrayEquals(bytes[bytesIndex++], value);
}
+ iterator.close();
}
@@ -171,6 +172,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
assertArrayEquals(bytes[bytesIndex++], keys);
iterator.next();
}
+ iterator.close();
}
private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index fed39b7..2088fbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -80,6 +80,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
assertArrayEquals(expected.value, next.value);
assertEquals(expected.key, next.key);
}
+ iterator.close();
}
@Test
@@ -98,6 +99,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
assertThat(iterator.peekNextKey(), equalTo(0L));
iterator.next();
assertThat(iterator.peekNextKey(), equalTo(10L));
+ iterator.close();
}
@Test
@@ -112,5 +114,6 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
assertThat(iterator.peekNextKey(), equalTo(0L));
iterator.next();
assertThat(iterator.peekNextKey(), equalTo(10L));
+ iterator.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
index a7e1aed..3e935cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -46,7 +46,6 @@ public class MeteredSegmentedBytesStoreTest {
private final Set<String> latencyRecorded = new HashSet<>();
private final Set<String> throughputRecorded = new HashSet<>();
- @SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
final Metrics metrics = new Metrics();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 237514e..ff7d234 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -133,7 +133,6 @@ public class RocksDBKeyValueStoreSupplierTest {
assertThat(store, is(instanceOf(MeteredKeyValueStore.class)));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenCached() throws Exception {
store = createStore(false, true);
@@ -142,7 +141,6 @@ public class RocksDBKeyValueStoreSupplierTest {
assertFalse(metrics.metrics().isEmpty());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenLogged() throws Exception {
store = createStore(true, false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 70f3708..97936fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -112,7 +112,6 @@ public class RocksDBSessionStoreSupplierTest {
assertThat(store, is(instanceOf(RocksDBSessionStore.class)));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenCached() throws Exception {
store = createStore(false, true);
@@ -121,7 +120,6 @@ public class RocksDBSessionStoreSupplierTest {
assertFalse(metrics.metrics().isEmpty());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenLogged() throws Exception {
store = createStore(true, false);
@@ -130,7 +128,6 @@ public class RocksDBSessionStoreSupplierTest {
assertFalse(metrics.metrics().isEmpty());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
store = createStore(false, false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 77fe8ee..f177aa3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -135,7 +135,6 @@ public class RocksDBWindowStoreSupplierTest {
assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenCached() throws Exception {
store = createStore(false, true, 3);
@@ -144,7 +143,6 @@ public class RocksDBWindowStoreSupplierTest {
assertFalse(metrics.metrics().isEmpty());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenLogged() throws Exception {
store = createStore(true, false, 3);
@@ -153,7 +151,6 @@ public class RocksDBWindowStoreSupplierTest {
assertFalse(metrics.metrics().isEmpty());
}
- @SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
store = createStore(false, false, 3);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 8a3a8ba..c2b03c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -39,7 +39,6 @@ public class StoreChangeLoggerTest {
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
- @SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(final String topic,
final K1 key,
@@ -71,7 +70,6 @@ public class StoreChangeLoggerTest {
context.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testAddRemove() throws Exception {
context.setTime(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 0af2594..c01e169 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -101,6 +101,7 @@ public class BrokerCompatibilityTest {
System.out.println("close Kafka Streams");
+ producer.close();
streams.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 11e1ae8..9193d1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
// This main() is not used by the system test. It is intended to be used for local debugging.
public static void main(String[] args) throws Exception {
final String kafka = "localhost:9092";
- final String zookeeper = "localhost:2181";
final File stateDir = TestUtils.tempDirectory();
final int numKeys = 20;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index b9288d7..a9537a7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -370,108 +370,109 @@ public class ClientCompatibilityTest {
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512);
ClientCompatibilityTestDeserializer deserializer =
new ClientCompatibilityTestDeserializer(testConfig.expectClusterId);
- final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
- if (partitionInfos.size() < 1)
- throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
- final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
- final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
- for (PartitionInfo partitionInfo : partitionInfos) {
- TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
- timestampsToSearch.put(topicPartition, prodTimeMs);
- topicPartitions.add(topicPartition);
- }
- final OffsetsForTime offsetsForTime = new OffsetsForTime();
- tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
- new Invoker() {
- @Override
- public void invoke() {
- offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
- }
- },
- new ResultTester() {
- @Override
- public void test() {
- log.info("offsetsForTime = {}", offsetsForTime.result);
- }
- });
- // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
- // should work.
- consumer.beginningOffsets(timestampsToSearch.keySet());
- consumer.endOffsets(timestampsToSearch.keySet());
-
- consumer.assign(topicPartitions);
- consumer.seekToBeginning(topicPartitions);
- final Iterator<byte[]> iter = new Iterator<byte[]>() {
- private static final int TIMEOUT_MS = 10000;
- private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
- private byte[] next = null;
-
- private byte[] fetchNext() {
- while (true) {
- long curTime = Time.SYSTEM.milliseconds();
- if (curTime - prodTimeMs > TIMEOUT_MS)
- throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms.");
- if (recordIter == null) {
- ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
- recordIter = records.iterator();
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
+ if (partitionInfos.size() < 1)
+ throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
+ final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+ final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+ timestampsToSearch.put(topicPartition, prodTimeMs);
+ topicPartitions.add(topicPartition);
+ }
+ final OffsetsForTime offsetsForTime = new OffsetsForTime();
+ tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
+ new Invoker() {
+ @Override
+ public void invoke() {
+ offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
+ }
+ },
+ new ResultTester() {
+ @Override
+ public void test() {
+ log.info("offsetsForTime = {}", offsetsForTime.result);
+ }
+ });
+ // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
+ // should work.
+ consumer.beginningOffsets(timestampsToSearch.keySet());
+ consumer.endOffsets(timestampsToSearch.keySet());
+
+ consumer.assign(topicPartitions);
+ consumer.seekToBeginning(topicPartitions);
+ final Iterator<byte[]> iter = new Iterator<byte[]>() {
+ private static final int TIMEOUT_MS = 10000;
+ private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
+ private byte[] next = null;
+
+ private byte[] fetchNext() {
+ while (true) {
+ long curTime = Time.SYSTEM.milliseconds();
+ if (curTime - prodTimeMs > TIMEOUT_MS)
+ throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms.");
+ if (recordIter == null) {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+ recordIter = records.iterator();
+ }
+ if (recordIter.hasNext())
+ return recordIter.next().value();
+ recordIter = null;
}
- if (recordIter.hasNext())
- return recordIter.next().value();
- recordIter = null;
}
- }
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
- next = fetchNext();
- return next != null;
- }
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+ next = fetchNext();
+ return next != null;
+ }
- @Override
- public byte[] next() {
- if (!hasNext())
- throw new NoSuchElementException();
- byte[] cur = next;
- next = null;
- return cur;
- }
+ @Override
+ public byte[] next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ byte[] cur = next;
+ next = null;
+ return cur;
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- byte[] next = iter.next();
- try {
- compareArrays(message1, next);
- log.debug("Found first message...");
- } catch (RuntimeException e) {
- throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " +
- "running this program.");
- }
- try {
- next = iter.next();
- if (testConfig.expectRecordTooLargeException)
- throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " +
- "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ byte[] next = iter.next();
try {
- compareArrays(message2, next);
+ compareArrays(message1, next);
+ log.debug("Found first message...");
} catch (RuntimeException e) {
- System.out.println("The second message in this topic was not ours. Please use a new " +
- "topic when running this program.");
- Exit.exit(1);
+ throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " +
+ "running this program.");
+ }
+ try {
+ next = iter.next();
+ if (testConfig.expectRecordTooLargeException) {
+ throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " +
+ "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ }
+ try {
+ compareArrays(message2, next);
+ } catch (RuntimeException e) {
+ System.out.println("The second message in this topic was not ours. Please use a new " +
+ "topic when running this program.");
+ Exit.exit(1);
+ }
+ } catch (RecordTooLargeException e) {
+ log.debug("Got RecordTooLargeException", e);
+ if (!testConfig.expectRecordTooLargeException)
+ throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " +
+ "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
}
- } catch (RecordTooLargeException e) {
- log.debug("Got RecordTooLargeException", e);
- if (!testConfig.expectRecordTooLargeException)
- throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " +
- "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ log.debug("Closing consumer.");
}
- log.debug("Closing consumer.");
- consumer.close();
log.info("Closed consumer.");
}
[2/2] kafka git commit: MINOR: Code Cleanup
Posted by jg...@apache.org.
MINOR: Code Cleanup
Clean up includes:
- Switching try-catch-finally blocks to try-with-resources when possible
- Removing some seemingly unnecessary `SuppressWarnings` annotations
- Resolving some Java warnings
- Closing unclosed Closable objects
- Removing unused code
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Balint Molnar <ba...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #3222 from vahidhashemian/minor/code_cleanup_1706
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f87d58b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f87d58b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f87d58b7
Branch: refs/heads/trunk
Commit: f87d58b796977fdaefb089d17cb30b2071cd4485
Parents: 3bfc073
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Jul 19 10:51:28 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jul 19 10:51:28 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 3 +-
.../producer/internals/TransactionManager.java | 1 -
.../requests/OffsetsForLeaderEpochRequest.java | 4 +-
.../clients/consumer/KafkaConsumerTest.java | 17 +-
.../clients/producer/KafkaProducerTest.java | 33 ++--
.../clients/producer/MockProducerTest.java | 71 ++++++-
.../clients/producer/internals/SenderTest.java | 9 +-
.../utils/ByteBufferOutputStreamTest.java | 10 +-
.../connect/file/FileStreamSourceTaskTest.java | 1 +
.../connect/runtime/isolation/Plugins.java | 1 -
.../connect/storage/FileOffsetBackingStore.java | 4 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 1 -
.../storage/KafkaConfigBackingStoreTest.java | 17 +-
.../kafka/connect/util/TopicAdminTest.java | 1 -
.../kafka/connect/transforms/CastTest.java | 144 ++++++--------
.../connect/transforms/ExtractFieldTest.java | 9 +-
.../kafka/connect/transforms/FlattenTest.java | 54 +++---
.../connect/transforms/HoistFieldTest.java | 9 +-
.../connect/transforms/InsertFieldTest.java | 14 +-
.../connect/transforms/RegexRouterTest.java | 5 +-
.../connect/transforms/ReplaceFieldTest.java | 11 +-
.../transforms/SetSchemaMetadataTest.java | 11 +-
.../transforms/TimestampConverterTest.java | 128 ++++++-------
.../connect/transforms/TimestampRouterTest.java | 8 +-
.../connect/transforms/ValueToKeyTest.java | 9 +-
.../apache/kafka/streams/kstream/KStream.java | 1 -
.../kafka/streams/kstream/KStreamBuilder.java | 6 +-
.../kstream/internals/KGroupedStreamImpl.java | 7 -
.../kstream/internals/KTableKTableJoin.java | 1 -
.../kstream/internals/KTableKTableLeftJoin.java | 1 -
.../internals/KTableKTableOuterJoin.java | 1 -
.../internals/KTableKTableRightJoin.java | 1 -
.../internals/GlobalStateUpdateTask.java | 1 -
.../processor/internals/StreamThread.java | 1 -
.../internals/assignment/AssignmentInfo.java | 4 +-
.../apache/kafka/streams/state/StateSerdes.java | 1 -
.../state/internals/CachingKeyValueStore.java | 1 -
.../state/internals/CachingWindowStore.java | 1 -
.../ChangeLoggingSegmentedBytesStore.java | 1 -
.../streams/state/internals/RocksDBStore.java | 1 -
.../streams/kstream/KStreamBuilderTest.java | 10 +-
...reamSessionWindowAggregateProcessorTest.java | 2 -
.../WindowedStreamPartitionerTest.java | 6 +
.../streams/processor/TopologyBuilderTest.java | 14 +-
.../processor/internals/QuickUnionTest.java | 1 -
.../processor/internals/StandbyTaskTest.java | 2 -
.../internals/StreamPartitionAssignorTest.java | 1 -
.../processor/internals/StreamTaskTest.java | 1 -
.../streams/state/KeyValueStoreTestDriver.java | 1 -
.../ChangeLoggingSegmentedBytesStoreTest.java | 3 -
.../CompositeReadOnlyKeyValueStoreTest.java | 4 -
.../DelegatingPeekingKeyValueIteratorTest.java | 5 +
...gedSortedCacheKeyValueStoreIteratorTest.java | 2 +
...rtedCacheWrappedWindowStoreIteratorTest.java | 3 +
.../MeteredSegmentedBytesStoreTest.java | 1 -
.../RocksDBKeyValueStoreSupplierTest.java | 2 -
.../RocksDBSessionStoreSupplierTest.java | 3 -
.../RocksDBWindowStoreSupplierTest.java | 3 -
.../state/internals/StoreChangeLoggerTest.java | 2 -
.../streams/tests/BrokerCompatibilityTest.java | 1 +
.../kafka/streams/tests/SmokeTestDriver.java | 1 -
.../kafka/tools/ClientCompatibilityTest.java | 187 ++++++++++---------
62 files changed, 419 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c54b739..5f854d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -302,7 +302,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
keySerializer, valueSerializer);
}
- @SuppressWarnings({"unchecked", "deprecation"})
+ @SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
@@ -339,7 +339,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = ensureExtended(valueSerializer);
}
-
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 8cee794..c5542e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -628,7 +628,6 @@ public class TransactionManager {
}
@Override
- @SuppressWarnings("unchecked")
public void onComplete(ClientResponse response) {
if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index f898a75..fc31d75 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -42,7 +42,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
}
public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
- private Map<TopicPartition, Integer> epochsByPartition = new HashMap();
+ private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>();
public Builder() {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
@@ -129,7 +129,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
- Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
+ Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
for (TopicPartition tp : epochsByPartition.keySet()) {
errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 219c3f6..9fd7e19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -129,13 +129,12 @@ public class KafkaConsumerTest {
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ Assert.fail("should have caught an exception and returned");
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
assertEquals("Failed to construct kafka consumer", e.getMessage());
- return;
}
- Assert.fail("should have caught an exception and returned");
}
@Test
@@ -1191,23 +1190,17 @@ public class KafkaConsumerTest {
@Test(expected = IllegalStateException.class)
public void testPollWithEmptySubscription() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer();
- consumer.subscribe(Collections.<String>emptyList());
- try {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ consumer.subscribe(Collections.<String>emptyList());
consumer.poll(0);
- } finally {
- consumer.close();
}
}
@Test(expected = IllegalStateException.class)
public void testPollWithEmptyUserAssignment() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer();
- consumer.assign(Collections.<TopicPartition>emptySet());
- try {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ consumer.assign(Collections.<TopicPartition>emptySet());
consumer.poll(0);
- } finally {
- consumer.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 56c1b18..dd62457 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -87,16 +87,13 @@ public class KafkaProducerTest {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
- try {
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
- props, new ByteArraySerializer(), new ByteArraySerializer());
+ try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+ fail("should have caught an exception and returned");
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
assertEquals("Failed to construct kafka producer", e.getMessage());
- return;
}
- fail("should have caught an exception and returned");
}
@Test
@@ -172,9 +169,7 @@ public class KafkaProducerTest {
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
- config, new ByteArraySerializer(), new ByteArraySerializer());
- producer.close();
+ new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()).close();
}
@Test(expected = KafkaException.class)
@@ -355,7 +350,7 @@ public class KafkaProducerTest {
}
Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
}
-
+
@PrepareOnlyThisForTest(Metadata.class)
@Test
public void testHeaders() throws Exception {
@@ -369,8 +364,6 @@ public class KafkaProducerTest {
MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
String topic = "topic";
- Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
-
final Cluster cluster = new Cluster(
"dummy",
Collections.singletonList(new Node(0, "host1", 1000)),
@@ -395,9 +388,9 @@ public class KafkaProducerTest {
//ensure headers can be mutated pre send.
record.headers().add(new RecordHeader("test", "header2".getBytes()));
-
+
producer.send(record, null);
-
+
//ensure headers are closed and cannot be mutated post send
try {
record.headers().add(new RecordHeader("test", "test".getBytes()));
@@ -405,7 +398,7 @@ public class KafkaProducerTest {
} catch (IllegalStateException ise) {
//expected
}
-
+
//ensure existing headers are not changed, and last header for key is still original value
assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
@@ -436,7 +429,7 @@ public class KafkaProducerTest {
assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
}
}
-
+
@PrepareOnlyThisForTest(Metadata.class)
@Test
public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
@@ -445,7 +438,7 @@ public class KafkaProducerTest {
props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
String topic = "topic";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
-
+
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(),
new StringSerializer());
Metadata metadata = PowerMock.createNiceMock(Metadata.class);
@@ -457,20 +450,18 @@ public class KafkaProducerTest {
Collections.<String>emptySet(),
Collections.<String>emptySet());
EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-
+
// Mock interceptors field
ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class);
EasyMock.expect(interceptors.onSend(record)).andReturn(record);
interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(), EasyMock.<Exception>notNull());
EasyMock.expectLastCall();
MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors);
-
+
PowerMock.replay(metadata);
EasyMock.replay(interceptors);
producer.send(record);
-
+
EasyMock.verify(interceptors);
-
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index eeb9b5f..d343194 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.MockSerializer;
+import org.junit.After;
import org.junit.Test;
import java.util.ArrayList;
@@ -47,14 +48,23 @@ import static org.junit.Assert.fail;
public class MockProducerTest {
private final String topic = "topic";
- private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
+ private MockProducer<byte[], byte[]> producer;
private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
+ private void buildMockProducer(boolean autoComplete) {
+ this.producer = new MockProducer<>(autoComplete, new MockSerializer(), new MockSerializer());
+ }
+
+ @After
+ public void cleanup() {
+ if (this.producer != null && !this.producer.closed())
+ this.producer.close();
+ }
@Test
- @SuppressWarnings("unchecked")
public void testAutoCompleteMock() throws Exception {
+ buildMockProducer(true);
Future<RecordMetadata> metadata = producer.send(record1);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
@@ -77,11 +87,12 @@ public class MockProducerTest {
assertEquals("Partition should be correct", 1, metadata.get().partition());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
+ producer.close();
}
@Test
public void testManualCompletion() throws Exception {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ buildMockProducer(false);
Future<RecordMetadata> md1 = producer.send(record1);
assertFalse("Send shouldn't have completed", md1.isDone());
Future<RecordMetadata> md2 = producer.send(record2);
@@ -98,7 +109,7 @@ public class MockProducerTest {
assertEquals(e, err.getCause());
}
assertFalse("No more requests to complete", producer.completeNext());
-
+
Future<RecordMetadata> md3 = producer.send(record1);
Future<RecordMetadata> md4 = producer.send(record2);
assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone());
@@ -108,12 +119,14 @@ public class MockProducerTest {
@Test
public void shouldInitTransactions() {
+ buildMockProducer(true);
producer.initTransactions();
assertTrue(producer.transactionInitialized());
}
@Test
public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() {
+ buildMockProducer(true);
producer.initTransactions();
try {
producer.initTransactions();
@@ -123,11 +136,13 @@ public class MockProducerTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() {
+ buildMockProducer(true);
producer.beginTransaction();
}
@Test
public void shouldBeginTransactions() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
assertTrue(producer.transactionInFlight());
@@ -135,11 +150,13 @@ public class MockProducerTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() {
+ buildMockProducer(true);
producer.sendOffsetsToTransaction(null, null);
}
@Test
public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() {
+ buildMockProducer(true);
producer.initTransactions();
try {
producer.sendOffsetsToTransaction(null, null);
@@ -149,11 +166,13 @@ public class MockProducerTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowOnCommitIfTransactionsNotInitialized() {
+ buildMockProducer(true);
producer.commitTransaction();
}
@Test
public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() {
+ buildMockProducer(true);
producer.initTransactions();
try {
producer.commitTransaction();
@@ -163,6 +182,7 @@ public class MockProducerTest {
@Test
public void shouldCommitEmptyTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
producer.commitTransaction();
@@ -173,6 +193,7 @@ public class MockProducerTest {
@Test
public void shouldCountCommittedTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -183,6 +204,7 @@ public class MockProducerTest {
@Test
public void shouldNotCountAbortedTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -195,11 +217,13 @@ public class MockProducerTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowOnAbortIfTransactionsNotInitialized() {
+ buildMockProducer(true);
producer.abortTransaction();
}
@Test
public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() {
+ buildMockProducer(true);
producer.initTransactions();
try {
producer.abortTransaction();
@@ -209,6 +233,7 @@ public class MockProducerTest {
@Test
public void shouldAbortEmptyTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
producer.abortTransaction();
@@ -219,6 +244,7 @@ public class MockProducerTest {
@Test
public void shouldAbortInFlightTransactionOnClose() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
producer.close();
@@ -229,11 +255,13 @@ public class MockProducerTest {
@Test(expected = IllegalStateException.class)
public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
+ buildMockProducer(true);
producer.fenceProducer();
}
@Test
public void shouldThrowOnBeginTransactionsIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -244,6 +272,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnSendIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -254,6 +283,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnFlushIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -264,6 +294,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -274,6 +305,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnCommitTransactionIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -284,6 +316,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnAbortTransactionIfProducerGotFenced() {
+ buildMockProducer(true);
producer.initTransactions();
producer.fenceProducer();
try {
@@ -294,6 +327,7 @@ public class MockProducerTest {
@Test
public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -313,7 +347,7 @@ public class MockProducerTest {
@Test
public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ buildMockProducer(false);
producer.initTransactions();
producer.beginTransaction();
@@ -331,6 +365,7 @@ public class MockProducerTest {
@Test
public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -346,7 +381,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ buildMockProducer(false);
producer.initTransactions();
producer.beginTransaction();
@@ -359,6 +394,7 @@ public class MockProducerTest {
@Test
public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -378,6 +414,7 @@ public class MockProducerTest {
@Test
public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -410,6 +447,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -421,6 +459,7 @@ public class MockProducerTest {
@Test
public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), "groupId");
@@ -429,6 +468,7 @@ public class MockProducerTest {
@Test
public void shouldAddOffsetsWhenSendOffsetsToTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -445,6 +485,7 @@ public class MockProducerTest {
@Test
public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -465,6 +506,7 @@ public class MockProducerTest {
@Test
public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -501,6 +543,7 @@ public class MockProducerTest {
@Test
public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -521,6 +564,7 @@ public class MockProducerTest {
@Test
public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() {
+ buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
@@ -545,6 +589,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnInitTransactionIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.initTransactions();
@@ -554,6 +599,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnSendIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.send(null);
@@ -563,6 +609,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnBeginTransactionIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.beginTransaction();
@@ -572,6 +619,7 @@ public class MockProducerTest {
@Test
public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.sendOffsetsToTransaction(null, null);
@@ -581,6 +629,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnCommitTransactionIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.commitTransaction();
@@ -590,6 +639,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnAbortTransactionIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.abortTransaction();
@@ -599,6 +649,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnCloseIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.close();
@@ -608,6 +659,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnFenceProducerIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.fenceProducer();
@@ -617,6 +669,7 @@ public class MockProducerTest {
@Test
public void shouldThrowOnFlushProducerIfProducerIsClosed() {
+ buildMockProducer(true);
producer.close();
try {
producer.flush();
@@ -626,25 +679,27 @@ public class MockProducerTest {
@Test
public void shouldBeFlushedIfNoBufferedRecords() {
+ buildMockProducer(true);
assertTrue(producer.flushed());
}
@Test
public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() {
+ buildMockProducer(true);
producer.send(record1);
assertTrue(producer.flushed());
}
@Test
public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ buildMockProducer(false);
producer.send(record1);
assertFalse(producer.flushed());
}
@Test
public void shouldNotBeFlushedAfterFlush() {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ buildMockProducer(false);
producer.send(record1);
producer.flush();
assertTrue(producer.flushed());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0537a35..d587de4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -633,10 +633,9 @@ public class SenderTest {
String topic = tp.topic();
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
- Metrics m = new Metrics();
- accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
- new ApiVersions(), txnManager);
- try {
+ try (Metrics m = new Metrics()) {
+ accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+ new ApiVersions(), txnManager);
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
@@ -701,8 +700,6 @@ public class SenderTest {
assertTrue("There should be a split",
m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
- } finally {
- m.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
index fbac719..26ba3b8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertArrayEquals;
@@ -49,6 +50,7 @@ public class ByteBufferOutputStreamTest {
byte[] bytes = new byte[5];
buffer.get(bytes);
assertArrayEquals("hello".getBytes(), bytes);
+ output.close();
}
@Test
@@ -75,19 +77,20 @@ public class ByteBufferOutputStreamTest {
byte[] bytes = new byte[5];
buffer.get(bytes);
assertArrayEquals("hello".getBytes(), bytes);
+ output.close();
}
@Test
- public void testWriteByteBuffer() {
+ public void testWriteByteBuffer() throws IOException {
testWriteByteBuffer(ByteBuffer.allocate(16));
}
@Test
- public void testWriteDirectByteBuffer() {
+ public void testWriteDirectByteBuffer() throws IOException {
testWriteByteBuffer(ByteBuffer.allocateDirect(16));
}
- private void testWriteByteBuffer(ByteBuffer input) {
+ private void testWriteByteBuffer(ByteBuffer input) throws IOException {
long value = 234239230L;
input.putLong(value);
input.flip();
@@ -97,6 +100,7 @@ public class ByteBufferOutputStreamTest {
assertEquals(8, input.position());
assertEquals(8, output.position());
assertEquals(value, output.buffer().getLong(0));
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index eb91dbd..03fb774 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -123,6 +123,7 @@ public class FileStreamSourceTaskTest {
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
+ os.close();
task.stop();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 654f485..f7a5553 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -137,7 +137,6 @@ public class Plugins {
return delegatingLoader.transformations();
}
- @SuppressWarnings("unchecked")
public Connector newConnector(String connectorClassOrAlias) {
Class<? extends Connector> klass;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 32f5a38..0547fe6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -69,8 +69,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
@SuppressWarnings("unchecked")
private void load() {
- try {
- ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
+ try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(file))) {
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
@@ -81,7 +80,6 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
- is.close();
} catch (FileNotFoundException | EOFException e) {
// FileNotFoundException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 29a6b52..6f77f65 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -115,7 +115,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private long recordsReturned;
- @SuppressWarnings("unchecked")
@Override
public void setup() {
super.setup();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 07d192b..e9dd18e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -135,7 +135,6 @@ public class KafkaConfigBackingStoreTest {
KafkaBasedLog<String, byte[]> storeLog;
private KafkaConfigBackingStore configStorage;
- private String internalTopic;
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
@@ -363,7 +362,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -403,7 +402,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -445,7 +444,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -494,7 +493,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -540,7 +539,7 @@ public class KafkaConfigBackingStoreTest {
// Connector after root update should make it through, task update shouldn't
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -594,7 +593,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -640,7 +639,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -691,7 +690,7 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index bafbce8..f90b77f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -49,7 +49,6 @@ public class TopicAdminTest {
@Test
public void returnNullWithApiVersionMismatch() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
- boolean internal = false;
Cluster cluster = createCluster(1);
try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
env.kafkaClient().setNode(cluster.controller());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index 88afafc..b190189 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -34,42 +35,44 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class CastTest {
+ private final Cast<SourceRecord> xformKey = new Cast.Key<>();
+ private final Cast<SourceRecord> xformValue = new Cast.Value<>();
+
+ @After
+ public void teardown() {
+ xformKey.close();
+ xformValue.close();
+ }
@Test(expected = ConfigException.class)
public void testConfigEmpty() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidSchemaType() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype"));
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidTargetType() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidMap() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
}
@Test(expected = ConfigException.class)
public void testConfigMixWholeAndFieldTransformation() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"));
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"));
}
@Test
public void castWholeRecordKeyWithSchema() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+ SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus"));
assertEquals(Schema.Type.INT8, transformed.keySchema().type());
@@ -78,9 +81,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaInt8() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
@@ -89,9 +91,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaInt16() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.INT16, transformed.valueSchema().type());
@@ -100,9 +101,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaInt32() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -111,9 +111,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaInt64() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.INT64, transformed.valueSchema().type());
@@ -122,9 +121,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaFloat32() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.FLOAT32, transformed.valueSchema().type());
@@ -133,9 +131,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaFloat64() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.FLOAT64, transformed.valueSchema().type());
@@ -144,9 +141,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaBooleanTrue() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -155,9 +151,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaBooleanFalse() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 0));
assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -166,9 +161,8 @@ public class CastTest {
@Test
public void castWholeRecordValueWithSchemaString() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Schema.INT32_SCHEMA, 42));
assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
@@ -178,9 +172,8 @@ public class CastTest {
@Test
public void castWholeRecordDefaultValue() {
// Validate default value in schema is correctly converted
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
SchemaBuilder.float32().defaultValue(-42.125f).build(), 42.125f));
assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -190,9 +183,8 @@ public class CastTest {
@Test
public void castWholeRecordKeySchemaless() {
- final Cast<SourceRecord> xform = new Cast.Key<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+ SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
null, 42, Schema.STRING_SCHEMA, "bogus"));
assertNull(transformed.keySchema());
@@ -201,9 +193,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessInt8() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -212,9 +203,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessInt16() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -223,9 +213,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessInt32() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -234,9 +223,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessInt64() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -245,9 +233,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessFloat32() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -256,9 +243,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessFloat64() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -267,9 +253,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessBooleanTrue() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -278,9 +263,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessBooleanFalse() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 0));
assertNull(transformed.valueSchema());
@@ -289,9 +273,8 @@ public class CastTest {
@Test
public void castWholeRecordValueSchemalessString() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, 42));
assertNull(transformed.valueSchema());
@@ -300,16 +283,14 @@ public class CastTest {
@Test(expected = DataException.class)
public void castWholeRecordValueSchemalessUnsupportedType() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
- xform.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo")));
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+ xformValue.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo")));
}
@Test
public void castFieldsWithSchema() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
// Include an optional fields and fields with defaults to validate their values are passed through properly
SchemaBuilder builder = SchemaBuilder.struct();
@@ -336,7 +317,7 @@ public class CastTest {
recordValue.put("string", "42");
// optional field intentionally omitted
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
supportedTypesSchema, recordValue));
assertEquals((short) 8, ((Struct) transformed.value()).get("int8"));
@@ -356,8 +337,7 @@ public class CastTest {
@SuppressWarnings("unchecked")
@Test
public void castFieldsSchemaless() {
- final Cast<SourceRecord> xform = new Cast.Value<>();
- xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
Map<String, Object> recordValue = new HashMap<>();
recordValue.put("int8", (byte) 8);
recordValue.put("int16", (short) 16);
@@ -367,7 +347,7 @@ public class CastTest {
recordValue.put("float64", -64.);
recordValue.put("boolean", true);
recordValue.put("string", "42");
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
null, recordValue));
assertNull(transformed.valueSchema());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index b54a908..0b7ce96 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -28,10 +29,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ExtractFieldTest {
+ private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void schemaless() {
- final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
@@ -43,7 +49,6 @@ public class ExtractFieldTest {
@Test
public void withSchema() {
- final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 86851f3..d709054 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Arrays;
@@ -36,25 +37,30 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FlattenTest {
+ private final Flatten<SourceRecord> xformKey = new Flatten.Key<>();
+ private final Flatten<SourceRecord> xformValue = new Flatten.Value<>();
+
+ @After
+ public void teardown() {
+ xformKey.close();
+ xformValue.close();
+ }
@Test(expected = DataException.class)
public void topLevelStructRequired() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
- xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42));
+ xformValue.configure(Collections.<String, String>emptyMap());
+ xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42));
}
@Test(expected = DataException.class)
public void topLevelMapRequired() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
- xform.apply(new SourceRecord(null, null, "topic", 0, null, 42));
+ xformValue.configure(Collections.<String, String>emptyMap());
+ xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42));
}
@Test
public void testNestedStruct() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
SchemaBuilder builder = SchemaBuilder.struct();
builder.field("int8", Schema.INT8_SCHEMA);
@@ -93,7 +99,7 @@ public class FlattenTest {
Struct twoLevelNestedStruct = new Struct(twoLevelNestedSchema);
twoLevelNestedStruct.put("A", oneLevelNestedStruct);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
twoLevelNestedSchema, twoLevelNestedStruct));
@@ -113,8 +119,7 @@ public class FlattenTest {
@Test
public void testNestedMapWithDelimiter() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.singletonMap("delimiter", "#"));
+ xformValue.configure(Collections.singletonMap("delimiter", "#"));
Map<String, Object> supportedTypes = new HashMap<>();
supportedTypes.put("int8", (byte) 8);
@@ -130,7 +135,7 @@ public class FlattenTest {
Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes);
Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", (Object) oneLevelNestedMap);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
null, twoLevelNestedMap));
@@ -151,8 +156,7 @@ public class FlattenTest {
@Test
public void testOptionalFieldStruct() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
SchemaBuilder builder = SchemaBuilder.struct();
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -168,7 +172,7 @@ public class FlattenTest {
Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
oneLevelNestedStruct.put("B", supportedTypes);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
oneLevelNestedSchema, oneLevelNestedStruct));
@@ -179,15 +183,14 @@ public class FlattenTest {
@Test
public void testOptionalFieldMap() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
Map<String, Object> supportedTypes = new HashMap<>();
supportedTypes.put("opt_int32", null);
Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
null, oneLevelNestedMap));
@@ -200,12 +203,11 @@ public class FlattenTest {
@Test
public void testKey() {
- final Flatten<SourceRecord> xform = new Flatten.Key<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformKey.configure(Collections.<String, String>emptyMap());
Map<String, Map<String, Integer>> key = Collections.singletonMap("A", Collections.singletonMap("B", 12));
SourceRecord src = new SourceRecord(null, null, "topic", null, key, null, null);
- SourceRecord transformed = xform.apply(src);
+ SourceRecord transformed = xformKey.apply(src);
assertNull(transformed.keySchema());
assertTrue(transformed.key() instanceof Map);
@@ -215,10 +217,9 @@ public class FlattenTest {
@Test(expected = DataException.class)
public void testUnsupportedTypeInMap() {
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
Object value = Collections.singletonMap("foo", Arrays.asList("bar", "baz"));
- xform.apply(new SourceRecord(null, null, "topic", 0, null, value));
+ xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value));
}
@Test
@@ -227,8 +228,7 @@ public class FlattenTest {
// children should also be optional. Similarly, if the parent Struct has a default value, the default value for
// the flattened field
- final Flatten<SourceRecord> xform = new Flatten.Value<>();
- xform.configure(Collections.<String, String>emptyMap());
+ xformValue.configure(Collections.<String, String>emptyMap());
SchemaBuilder builder = SchemaBuilder.struct().optional();
builder.field("req_field", Schema.STRING_SCHEMA);
@@ -240,7 +240,7 @@ public class FlattenTest {
// Intentionally leave this entire value empty since it is optional
Struct value = new Struct(schema);
- SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, schema, value));
+ SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, schema, value));
assertNotNull(transformed);
Schema transformedSchema = transformed.valueSchema();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
index 299aab3..1135b85 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -27,10 +28,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class HoistFieldTest {
+ private final HoistField<SinkRecord> xform = new HoistField.Key<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void schemaless() {
- final HoistField<SinkRecord> xform = new HoistField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0);
@@ -42,7 +48,6 @@ public class HoistFieldTest {
@Test
public void withSchema() {
- final HoistField<SinkRecord> xform = new HoistField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index 4ce6ad4..a0a0975 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -32,14 +33,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
public class InsertFieldTest {
+ private InsertField<SourceRecord> xform = new InsertField.Value<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test(expected = DataException.class)
public void topLevelStructRequired() {
- final InsertField<SourceRecord> xform = new InsertField.Value<>();
xform.configure(Collections.singletonMap("topic.field", "topic_field"));
- xform.apply(new SourceRecord(null, null,
- "", 0,
- Schema.INT32_SCHEMA, 42));
+ xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42));
}
@Test
@@ -51,7 +55,6 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- final InsertField<SourceRecord> xform = new InsertField.Value<>();
xform.configure(props);
final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
@@ -94,7 +97,6 @@ public class InsertFieldTest {
props.put("static.field", "instance_id");
props.put("static.value", "my-instance-id");
- final InsertField<SourceRecord> xform = new InsertField.Value<>();
xform.configure(props);
final SourceRecord record = new SourceRecord(null, null, "test", 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
index cc001f1..aa47001 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
@@ -32,8 +32,9 @@ public class RegexRouterTest {
props.put("replacement", replacement);
final RegexRouter<SinkRecord> router = new RegexRouter<>();
router.configure(props);
- return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0))
- .topic();
+ String sinkTopic = router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)).topic();
+ router.close();
+ return sinkTopic;
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index e3d9d3a..6a1a13a 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.HashMap;
@@ -28,11 +29,15 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
public class ReplaceFieldTest {
+ private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void schemaless() {
- final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
final Map<String, String> props = new HashMap<>();
props.put("blacklist", "dont");
props.put("renames", "abc:xyz,foo:bar");
@@ -57,8 +62,6 @@ public class ReplaceFieldTest {
@Test
public void withSchema() {
- final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
final Map<String, String> props = new HashMap<>();
props.put("whitelist", "abc,foo");
props.put("renames", "abc:xyz,foo:bar");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
----------------------------------------------------------------------
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 206c51e..257b382 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
import org.junit.Test;
import java.util.Collections;
@@ -31,10 +32,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
public class SetSchemaMetadataTest {
+ private final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
+
+ @After
+ public void teardown() {
+ xform.close();
+ }
@Test
public void schemaNameUpdate() {
- final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(Collections.singletonMap("schema.name", "foo"));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
@@ -43,7 +49,6 @@ public class SetSchemaMetadataTest {
@Test
public void schemaVersionUpdate() {
- final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(Collections.singletonMap("schema.version", 42));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
@@ -56,7 +61,6 @@ public class SetSchemaMetadataTest {
props.put("schema.name", "foo");
props.put("schema.version", "42");
- final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(props);
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
@@ -83,7 +87,6 @@ public class SetSchemaMetadataTest {
final Map<String, String> props = new HashMap<>();
props.put("schema.name", "foo");
props.put("schema.version", "42");
- final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(props);
final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);