You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/15 03:14:43 UTC

kafka git commit: KAFKA-3375; Suppress deprecated warnings where reasonable and tweak compiler settings

Repository: kafka
Updated Branches:
  refs/heads/trunk ffbe624e6 -> 241c3ebb2


KAFKA-3375; Suppress deprecated warnings where reasonable and tweak compiler settings

* Fix and suppress number of unchecked warnings (except for Kafka Streams)
* Add `SafeVarargs` annotation to fix warnings
* Suppress unfixable deprecation warnings
* Replace deprecated by non-deprecated usage where possible
* Avoid reflective calls via structural types in Scala
* Tweak compiler settings for scalac and javac

Once we drop Java 7 and Scala 2.10, we can tweak the compiler settings further so that they warn us about more things.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Grant Henke, Gwen Shapira, Guozhang Wang

Closes #1042 from ijuma/kafka-3375-suppress-depreccated-tweak-compiler


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/241c3ebb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/241c3ebb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/241c3ebb

Branch: refs/heads/trunk
Commit: 241c3ebb2803f1e09306fb06f20a66e7a60ca3c8
Parents: ffbe624
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Mar 14 19:14:36 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Mar 14 19:14:36 2016 -0700

----------------------------------------------------------------------
 build.gradle                                      | 18 ++++++++++++++++++
 .../kafka/clients/consumer/ConsumerRecords.java   |  2 ++
 .../consumer/internals/ConsumerCoordinator.java   |  4 ++--
 .../clients/consumer/internals/RequestFuture.java |  6 +++---
 .../kafka/clients/producer/KafkaProducer.java     |  2 +-
 .../kafka/common/network/SaslChannelBuilder.java  |  1 +
 .../common/security/kerberos/LoginManager.java    |  2 +-
 .../kafka/common/security/ssl/SslFactory.java     |  3 ++-
 .../java/org/apache/kafka/common/utils/Utils.java |  2 ++
 .../common/requests/RequestResponseTest.java      |  1 +
 .../org/apache/kafka/connect/data/Struct.java     |  2 ++
 .../apache/kafka/connect/json/JsonConverter.java  |  2 +-
 .../org/apache/kafka/connect/runtime/Worker.java  |  1 +
 .../kafka/connect/storage/KafkaConfigStorage.java |  1 +
 .../connect/storage/KafkaStatusBackingStore.java  |  2 ++
 .../connect/storage/OffsetStorageReaderImpl.java  |  1 +
 .../apache/kafka/connect/storage/OffsetUtils.java |  1 +
 .../runtime/WorkerSinkTaskThreadedTest.java       |  1 +
 .../connect/runtime/WorkerSourceTaskTest.java     |  1 +
 .../rest/resources/ConnectorsResourceTest.java    |  1 +
 .../runtime/standalone/StandaloneHerderTest.java  |  1 +
 .../connect/storage/KafkaConfigStorageTest.java   |  1 +
 .../storage/KafkaOffsetBackingStoreTest.java      |  1 +
 .../storage/KafkaStatusBackingStoreTest.java      |  1 +
 .../util/ByteArrayProducerRecordEquals.java       |  1 +
 .../scala/integration/kafka/api/QuotasTest.scala  |  1 -
 .../test/scala/kafka/tools/TestLogCleaning.scala  |  4 ++--
 .../ZkNodeChangeNotificationListenerTest.scala    |  8 ++++----
 28 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c2bd228..321fc3f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -103,6 +103,12 @@ subprojects {
 
   sourceCompatibility = 1.7
 
+  compileJava {
+    options.encoding = 'UTF-8'
+    // Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7
+    options.compilerArgs << "-Xlint:deprecation"
+  }
+
   if (JavaVersion.current().isJava8Compatible()) {
     tasks.withType(Javadoc) {
         // disable the crazy super-strict doclint tool in Java 8
@@ -220,6 +226,18 @@ subprojects {
   tasks.withType(ScalaCompile) {
     scalaCompileOptions.useAnt = false
 
+    scalaCompileOptions.additionalParameters = [
+      "-deprecation",
+      "-unchecked",
+      "-encoding", "utf8",
+      "-target:jvm-${sourceCompatibility}".toString(),
+      "-Xlog-reflective-calls",
+      "-feature",
+      "-language:postfixOps",
+      "-language:implicitConversions",
+      "-language:existentials"
+    ]
+
     configure(scalaCompileOptions.forkOptions) {
       memoryMaximumSize = '1g'
       jvmArgs = ['-XX:MaxPermSize=512m', '-Xss2m']

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 8ee9be2..3d7ec60 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -28,6 +28,8 @@ import java.util.Set;
  * partition returned by a {@link Consumer#poll(long)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+
+    @SuppressWarnings("unchecked")
     public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP);
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b6b46c1..2ae1437 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -68,7 +68,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
     private final AutoCommitTask autoCommitTask;
-    private final ConsumerInterceptors interceptors;
+    private final ConsumerInterceptors<?, ?> interceptors;
 
     /**
      * Initialize the coordination manager.
@@ -87,7 +87,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                OffsetCommitCallback defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
                                long autoCommitIntervalMs,
-                               ConsumerInterceptors interceptors) {
+                               ConsumerInterceptors<?, ?> interceptors) {
         super(client,
                 groupId,
                 sessionTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 7be99bd..71c16fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -42,7 +42,7 @@ public class RequestFuture<T> {
     private boolean isDone = false;
     private T value;
     private RuntimeException exception;
-    private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
+    private List<RequestFutureListener<T>> listeners = new ArrayList<>();
 
 
     /**
@@ -129,12 +129,12 @@ public class RequestFuture<T> {
     }
 
     private void fireSuccess() {
-        for (RequestFutureListener listener: listeners)
+        for (RequestFutureListener<T> listener : listeners)
             listener.onSuccess(value);
     }
 
     private void fireFailure() {
-        for (RequestFutureListener listener: listeners)
+        for (RequestFutureListener<T> listener : listeners)
             listener.onFailure(exception);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 85ba9ef..c87973a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -202,7 +202,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
              keySerializer, valueSerializer);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         try {
             log.trace("Starting the Kafka producer");

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index b3db4e1..0cd5bfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -61,6 +61,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 defaultRealm = "";
             }
 
+            @SuppressWarnings("unchecked")
             List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
             if (principalToLocalRules != null)
                 kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
index cf68d20..e163ba8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.JaasUtils;
 
 public class LoginManager {
 
-    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap(LoginType.class);
+    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class);
 
     private final Login login;
     private final String serviceName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 0d4d2ce..d0fe2e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -69,11 +69,12 @@ public class SslFactory implements Configurable {
         this.protocol =  (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
         this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
 
-
+        @SuppressWarnings("unchecked")
         List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
         if (cipherSuitesList != null)
             this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
 
+        @SuppressWarnings("unchecked")
         List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
         if (enabledProtocolsList != null)
             this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index daef458..4c4225b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -578,6 +578,7 @@ public class Utils {
      * @param <T> the type of element
      * @return Set
      */
+    @SafeVarargs
     public static <T> Set<T> mkSet(T... elems) {
         return new HashSet<>(Arrays.asList(elems));
     }
@@ -588,6 +589,7 @@ public class Utils {
      * @param <T> the type of element
      * @return List
      */
+    @SafeVarargs
     public static <T> List<T> mkList(T... elems) {
         return Arrays.asList(elems);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 3023837..7ccf079 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -370,6 +370,7 @@ public class RequestResponseTest {
         return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
     }
 
+    @SuppressWarnings("deprecation")
     private AbstractRequest createUpdateMetadataRequest(int version) {
         Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
index 4ca37c3..a598259 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
@@ -172,6 +172,7 @@ public class Struct {
     /**
      * Equivalent to calling {@link #get(String)} and casting the result to a List.
      */
+    @SuppressWarnings("unchecked")
     public <T> List<T> getArray(String fieldName) {
         return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
     }
@@ -179,6 +180,7 @@ public class Struct {
     /**
      * Equivalent to calling {@link #get(String)} and casting the result to a Map.
      */
+    @SuppressWarnings("unchecked")
     public <K, V> Map<K, V> getMap(String fieldName) {
         return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index a70cadd..d9a6859 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -425,7 +425,7 @@ public class JsonConverter implements Converter {
             ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
             for (Map.Entry<String, String> prop : schema.parameters().entrySet())
                 jsonSchemaParams.put(prop.getKey(), prop.getValue());
-            jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
+            jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
         }
         if (schema.defaultValue() != null)
             jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 4c0d016..aa57493 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -189,6 +189,7 @@ public class Worker {
         return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
     }
 
+    @SuppressWarnings("unchecked")
     private Class<? extends Connector> getConnectorClass(String connectorAlias) {
         // Avoid the classpath scan if the full class name was provided
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 7f2fb83..08c528c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -375,6 +375,7 @@ public class KafkaConfigStorage {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
     }
 
+    @SuppressWarnings("unchecked")
     private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() {
         @Override
         public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index eb9a48c..d24645e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -300,6 +300,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
                 return null;
             }
 
+            @SuppressWarnings("unchecked")
             Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
             TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
             String trace = (String) statusMap.get(TRACE_KEY_NAME);
@@ -319,6 +320,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
                 log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
                 return null;
             }
+            @SuppressWarnings("unchecked")
             Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
             TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
             String trace = (String) statusMap.get(TRACE_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index 23c1019..b404de2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -55,6 +55,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
         // Serialize keys so backing store can work with them
         Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
index f31715a..b457b12 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.DataException;
 import java.util.Map;
 
 public class OffsetUtils {
+    @SuppressWarnings("unchecked")
     public static void validateFormat(Object offsetData) {
         if (offsetData == null)
             return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index ac10d59..1099d7a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -535,6 +535,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         return capturedRecords;
     }
 
+    @SuppressWarnings("unchecked")
     private IExpectationSetters<Object> expectOnePoll() {
         // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
         // returning empty data, we return one record. The expectation is that the data will be ignored by the

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 404be0b..9b0133a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -365,6 +365,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return latch;
     }
 
+    @SuppressWarnings("unchecked")
     private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
         expectConvertKeyValue(false);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 1feab0d..4659ae8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(RestServer.class)
 @PowerMockIgnore("javax.management.*")
+@SuppressWarnings("unchecked")
 public class ConnectorsResourceTest {
     // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
     // URL construction properly, avoiding //, which will mess up routing in the REST server

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 07d0e3d..3959ff8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -63,6 +63,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
+@SuppressWarnings("unchecked")
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index f95704c..5e79a8d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -62,6 +62,7 @@ import static org.junit.Assert.fail;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaConfigStorage.class)
 @PowerMockIgnore("javax.management.*")
+@SuppressWarnings("unchecked")
 public class KafkaConfigStorageTest {
     private static final String TOPIC = "connect-configs";
     private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index aa92942..38e0f7b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.fail;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaOffsetBackingStore.class)
 @PowerMockIgnore("javax.management.*")
+@SuppressWarnings("unchecked")
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "connect-offsets";
     private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index 8acd31f..45ccdd5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -45,6 +45,7 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("unchecked")
 public class KafkaStatusBackingStoreTest extends EasyMockSupport {
 
     private static final String STATUS_TOPIC = "status-topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
index 4d17ac4..bcfcc23 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
@@ -36,6 +36,7 @@ public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean matches(Object argument) {
         if (!(argument instanceof ProducerRecord))
             return false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 23be120..b6a0ae5 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -71,7 +71,6 @@ class QuotasTest extends KafkaServerTestHarness {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
-    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
     producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index dcbfbe1..2e288ec 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -247,7 +247,7 @@ object TestLogCleaning {
                       dups: Int,
                       percentDeletes: Int): File = {
     val producerProps = new Properties
-    producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
     producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
@@ -316,4 +316,4 @@ case class TestRecord(val topic: String, val key: Int, val value: Long, val dele
   def this(line: String) = this(line.split("\t"))
   override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
   def topicAndKey = topic + key
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/241c3ebb/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 50496f0..8d48609 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -27,9 +27,9 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
 
   @Test
   def testProcessNotification() {
+    @volatile var notification: String = null
+    @volatile var invocationCount = 0
     val notificationHandler = new NotificationHandler {
-      @volatile var notification: String = _
-      @volatile var invocationCount: Integer = 0
       override def processNotification(notificationMessage: String): Unit = {
         notification = notificationMessage
         invocationCount += 1
@@ -48,7 +48,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
 
-    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
 
     /*There is no easy way to test that purging. Even if we mock kafka time with MockTime, the purging compares kafka time with the time stored in zookeeper stat and the
     embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check
@@ -56,6 +56,6 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
     depending on how threads get scheduled.*/
 
     zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
-    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
+    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
   }
 }