You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/20 12:15:52 UTC

[kafka] branch 2.0 updated: MINOR: Use exceptions in o.a.k.common if possible and deprecate ZkUtils (#5255)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b07d67c  MINOR: Use exceptions in o.a.k.common if possible and deprecate ZkUtils (#5255)
b07d67c is described below

commit b07d67ccb899de20f23fd17c81bcecd208087ba2
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jun 20 05:05:50 2018 -0700

    MINOR: Use exceptions in o.a.k.common if possible and deprecate ZkUtils (#5255)
    
    Also:
    - Remove exceptions in `kafka.common` that are no longer used.
    - Keep `kafka.common.KafkaException` as it's still used by `ZkUtils`,
    `kafka.admin.AdminClient` and `kafka.security.auth` classes and
    we would like to maintain compatibility for now.
    - Add deprecated annotation to `kafka.admin.AdminClient`. The scaladoc
    stated that the class is deprecated, but the annotation was missing.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Manikumar Reddy <ma...@gmail.com>
---
 core/src/main/scala/kafka/admin/AdminClient.scala  |  7 ++++--
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 10 ++++-----
 core/src/main/scala/kafka/api/ApiUtils.scala       |  5 +++--
 .../main/scala/kafka/cluster/BrokerEndPoint.scala  |  2 +-
 core/src/main/scala/kafka/cluster/EndPoint.scala   |  2 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |  3 +--
 core/src/main/scala/kafka/common/Config.scala      |  5 +++--
 .../common/IndexOffsetOverflowException.scala      |  2 +-
 .../kafka/common/InvalidConfigException.scala      | 25 ---------------------
 .../kafka/common/InvalidOffsetException.scala      | 22 ------------------
 .../main/scala/kafka/common/KafkaException.scala   |  5 ++++-
 .../kafka/common/LeaderNotAvailableException.scala | 26 ----------------------
 .../common/LogSegmentOffsetOverflowException.scala |  2 +-
 .../controller/ControllerChannelManager.scala      |  3 +--
 .../scala/kafka/controller/KafkaController.scala   |  2 +-
 .../coordinator/group/GroupMetadataManager.scala   |  4 ++--
 .../transaction/ProducerIdManager.scala            |  2 +-
 .../coordinator/transaction/TransactionLog.scala   |  4 ++--
 .../transaction/TransactionStateManager.scala      |  3 +--
 core/src/main/scala/kafka/log/Log.scala            |  6 ++---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  3 +--
 core/src/main/scala/kafka/log/OffsetIndex.scala    |  2 +-
 .../scala/kafka/log/ProducerStateManager.scala     |  3 +--
 core/src/main/scala/kafka/log/TimeIndex.scala      |  2 +-
 .../scala/kafka/message/CompressionCodec.scala     |  4 +++-
 .../main/scala/kafka/network/SocketServer.scala    |  3 +--
 .../scala/kafka/server/AbstractFetcherThread.scala |  6 ++---
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  1 +
 core/src/main/scala/kafka/utils/Pool.scala         |  3 ++-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  4 ++++
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  3 +--
 core/src/main/scala/kafka/zk/ZkData.scala          |  4 ++--
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |  4 ++--
 .../test/scala/unit/kafka/api/ApiUtilsTest.scala   |  4 +++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  3 +--
 .../transaction/ProducerIdManagerTest.scala        |  2 +-
 .../kafka/integration/KafkaServerTestHarness.scala |  2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  3 +--
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  4 ++--
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |  2 +-
 .../test/scala/unit/kafka/log/TimeIndexTest.scala  |  6 ++---
 .../unit/kafka/server/ServerStartupTest.scala      |  2 +-
 .../scala/unit/kafka/utils/CoreUtilsTest.scala     |  2 +-
 44 files changed, 75 insertions(+), 139 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index d847881..1009bc5 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -42,9 +42,10 @@ import scala.util.{Failure, Success, Try}
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
-  * and configurations.  This client is deprecated, and will be replaced by KafkaAdminClient.
-  * @see KafkaAdminClient
+  * and configurations. This client is deprecated, and will be replaced by org.apache.kafka.clients.admin.AdminClient.
   */
+@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " +
+  "a future release.", since = "0.11.0")
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
                   val retryBackoffMs: Long,
@@ -364,6 +365,8 @@ class CompositeFuture[T](time: Time,
   }
 }
 
+@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " +
+  "a future release.", since = "0.11.0")
 object AdminClient {
   val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
   val DefaultRequestTimeoutMs = 5000
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 6ac0a01..d8dade0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,6 @@ import java.util.{Collections, Properties}
 
 import joptsimple._
 import kafka.common.Config
-import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
 import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder}
@@ -32,6 +31,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -83,7 +83,7 @@ object ConfigCommand extends Config {
         processBrokerConfig(opts)
       }
     } catch {
-      case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException) =>
+      case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) =>
         logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e)
         System.err.println(e.getMessage)
         Exit.exit(1)
@@ -145,7 +145,7 @@ object ConfigCommand extends Config {
     // fail the command if any of the configs to be deleted does not exist
     val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
     if (invalidConfigs.nonEmpty)
-      throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+      throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
 
     configs ++= configsToBeAdded
     configsToBeDeleted.foreach(configs.remove(_))
@@ -307,12 +307,12 @@ object ConfigCommand extends Config {
     // fail the command if any of the configs to be deleted does not exist
     val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
     if (invalidConfigs.nonEmpty)
-      throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+      throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
 
     val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
     val sensitiveEntries = newEntries.filter(_._2.value == null)
     if (sensitiveEntries.nonEmpty)
-      throw new InvalidConfigException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
+      throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
     val newConfig = new JConfig(newEntries.asJava.values)
 
     val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala
index 63fece7..4a0c8b0 100644
--- a/core/src/main/scala/kafka/api/ApiUtils.scala
+++ b/core/src/main/scala/kafka/api/ApiUtils.scala
@@ -16,8 +16,9 @@
  */
 package kafka.api
 
-import java.nio._
-import kafka.common._
+import java.nio.ByteBuffer
+
+import org.apache.kafka.common.KafkaException
 
 /**
  * Helper functions specific to parsing or serializing requests and responses
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 847e959..986d352 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.common.KafkaException
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils._
 
 object BrokerEndPoint {
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 57ef0da..2bca5c8 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -17,7 +17,7 @@
 
 package kafka.cluster
 
-import kafka.common.KafkaException
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 4b65e43..962aaff 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -20,8 +20,7 @@ package kafka.cluster
 import kafka.log.Log
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
-import kafka.common.KafkaException
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Time
 
diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala
index d24fb0d..4110ba7 100644
--- a/core/src/main/scala/kafka/common/Config.scala
+++ b/core/src/main/scala/kafka/common/Config.scala
@@ -19,6 +19,7 @@ package kafka.common
 
 import util.matching.Regex
 import kafka.utils.Logging
+import org.apache.kafka.common.errors.InvalidConfigurationException
 
 trait Config extends Logging {
 
@@ -29,8 +30,8 @@ trait Config extends Logging {
     rgx.findFirstIn(value) match {
       case Some(t) =>
         if (!t.equals(value))
-          throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
-      case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+          throw new InvalidConfigurationException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
+      case None => throw new InvalidConfigurationException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
     }
   }
 }
diff --git a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala b/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala
index 7f3ea11..5dd9b43 100644
--- a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala
+++ b/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala
@@ -20,6 +20,6 @@ package kafka.common
 /**
  * Indicates that an attempt was made to append a message whose offset could cause the index offset to overflow.
  */
-class IndexOffsetOverflowException(message: String, cause: Throwable) extends KafkaException(message, cause) {
+class IndexOffsetOverflowException(message: String, cause: Throwable) extends org.apache.kafka.common.KafkaException(message, cause) {
   def this(message: String) = this(message, null)
 }
diff --git a/core/src/main/scala/kafka/common/InvalidConfigException.scala b/core/src/main/scala/kafka/common/InvalidConfigException.scala
deleted file mode 100644
index 6437846..0000000
--- a/core/src/main/scala/kafka/common/InvalidConfigException.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.common
-
-/**
- * Indicates that the given config parameter has invalid value
- */
-class InvalidConfigException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
diff --git a/core/src/main/scala/kafka/common/InvalidOffsetException.scala b/core/src/main/scala/kafka/common/InvalidOffsetException.scala
deleted file mode 100644
index c6811d7..0000000
--- a/core/src/main/scala/kafka/common/InvalidOffsetException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class InvalidOffsetException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/common/KafkaException.scala b/core/src/main/scala/kafka/common/KafkaException.scala
index e72d151..61b3ba3 100644
--- a/core/src/main/scala/kafka/common/KafkaException.scala
+++ b/core/src/main/scala/kafka/common/KafkaException.scala
@@ -17,7 +17,10 @@
 package kafka.common
 
 /**
- * Generic Kafka exception
+ * Usage of this class is discouraged. Use org.apache.kafka.common.KafkaException instead.
+ *
+ * This class will be removed once ZkUtils and the kafka.security.auth classes are removed.
+ * The former is internal, but widely used, so we are leaving it in the codebase for now.
 */
 class KafkaException(message: String, t: Throwable) extends RuntimeException(message, t) {
   def this(message: String) = this(message, null)
diff --git a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
deleted file mode 100644
index 972728e..0000000
--- a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when a request is made for partition, but no leader exists for that partition
- */
-class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
-  def this(message: String) = this(message, null)
-  def this() = this(null, null)
-}
diff --git a/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala b/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala
index 9a24efe..2de5906 100644
--- a/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala
+++ b/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala
@@ -26,5 +26,5 @@ import kafka.log.LogSegment
  * do not have any segments with offset overflow.
  */
 class LogSegmentOffsetOverflowException(val segment: LogSegment, val offset: Long)
-  extends KafkaException(s"Detected offset overflow at offset $offset in segment $segment") {
+  extends org.apache.kafka.common.KafkaException(s"Detected offset overflow at offset $offset in segment $segment") {
 }
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index addd88d..096b2b4 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
 import com.yammer.metrics.core.{Gauge, Timer}
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -35,7 +34,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9c33874..11d22fd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,7 @@ import kafka.utils._
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk._
 import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 233a76e..02ba13a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,14 +26,14 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
-import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
+import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index c3c9f7c..5c22c8e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -18,9 +18,9 @@ package kafka.coordinator.transaction
 
 import java.nio.charset.StandardCharsets
 
-import kafka.common.KafkaException
 import kafka.utils.{Json, Logging}
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
+import org.apache.kafka.common.KafkaException
 
 import scala.collection.JavaConverters._
 
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 2c7178e..2dc6e38 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -16,9 +16,9 @@
  */
 package kafka.coordinator.transaction
 
-import kafka.common.{KafkaException, MessageFormatter}
+import kafka.common.MessageFormatter
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import java.io.PrintStream
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e3b0321..a358515 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.common.KafkaException
 import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
 import kafka.server.Defaults
@@ -30,7 +29,7 @@ import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 64048fb..3036018 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -28,15 +28,15 @@ import java.util.regex.Pattern
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException}
+import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
 import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 08bfa4f..91ddbf0 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -27,7 +27,7 @@ import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 3bb5ee6..32203ac 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,13 +22,12 @@ import java.nio.file.Files
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
-import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
 
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index d185631..2babd00 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.nio.ByteBuffer
 
 import kafka.utils.CoreUtils.inLock
-import kafka.common.{IndexOffsetOverflowException, InvalidOffsetException}
+import org.apache.kafka.common.errors.InvalidOffsetException
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index abeac6e..caca9a8 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -20,11 +20,10 @@ import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.Files
 
-import kafka.common.KafkaException
 import kafka.log.Log.offsetFromFile
 import kafka.server.LogOffsetMetadata
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.types._
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 7fae130..1661cba 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -20,9 +20,9 @@ package kafka.log
 import java.io.File
 import java.nio.ByteBuffer
 
-import kafka.common.InvalidOffsetException
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
+import org.apache.kafka.common.errors.InvalidOffsetException
 import org.apache.kafka.common.record.RecordBatch
 
 /**
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index a485271..64e0aaa 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -19,6 +19,8 @@ package kafka.message
 
 import java.util.Locale
 
+import kafka.common.UnknownCodecException
+
 object CompressionCodec {
   def getCompressionCodec(codec: Int): CompressionCodec = {
     codec match {
@@ -26,7 +28,7 @@ object CompressionCodec {
       case GZIPCompressionCodec.codec => GZIPCompressionCodec
       case SnappyCompressionCodec.codec => SnappyCompressionCodec
       case LZ4CompressionCodec.codec => LZ4CompressionCodec
-      case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
+      case _ => throw new UnknownCodecException("%d is an unknown compression codec".format(codec))
     }
   }
   def getCompressionCodec(name: String): CompressionCodec = {
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 06da8df..62fc7a5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -26,13 +26,12 @@ import java.util.concurrent.atomic._
 
 import com.yammer.metrics.core.Gauge
 import kafka.cluster.{BrokerEndPoint, EndPoint}
-import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.{KafkaException, Reconfigurable}
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f27dbfe..e056ad6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,11 +19,11 @@ package kafka.server
 
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.cluster.{Replica, BrokerEndPoint}
+import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
 import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
 import org.apache.kafka.common.requests.EpochEndOffset._
-import kafka.common.{ClientIdAndBroker, KafkaException}
+import kafka.common.ClientIdAndBroker
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.protocol.Errors
@@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 3e64b93..8d8c42d 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets
 import joptsimple._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 4ddf557..742d3dc 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,9 +19,10 @@ package kafka.utils
 
 import java.util.concurrent._
 
+import org.apache.kafka.common.KafkaException
+
 import collection.mutable
 import collection.JavaConverters._
-import kafka.common.KafkaException
 
 class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
 
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 004ab3d..d47af0d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -36,6 +36,8 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
+@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
+  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
 object ZkUtils {
 
   private val UseDefaultAcls = new java.util.ArrayList[ACL]
@@ -183,6 +185,8 @@ object ZkUtils {
 /**
  * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
  */
+@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
+  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
 class ZkUtils(val zkClient: ZkClient,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d5beae8..bb34294 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.common.KafkaException
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +29,7 @@ import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index d782ae0..d2b2333 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -23,14 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty
 import com.fasterxml.jackson.core.JsonProcessingException
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.security.auth.Resource.Separator
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.PatternType
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 2644dcc..2e8179c 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.InvalidConfigException
 import kafka.server.{ConfigEntityName, KafkaConfig}
 import kafka.utils.{Exit, Logging}
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
@@ -30,6 +29,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
@@ -425,7 +425,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
-  @Test (expected = classOf[InvalidConfigException])
+  @Test (expected = classOf[InvalidConfigurationException])
   def shouldNotUpdateBrokerConfigIfNonExistingConfigIsDeleted(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "my-topic",
diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
index b71b00b..9fe4cbf 100644
--- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
@@ -20,10 +20,12 @@ package kafka.api
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
+
 import scala.util.Random
 import java.nio.ByteBuffer
-import kafka.common.KafkaException
+
 import kafka.utils.TestUtils
+import org.apache.kafka.common.KafkaException
 
 object ApiUtilsTest {
   val rnd: Random = new Random()
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index fe5d578..fe9038a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -22,10 +22,9 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.UnexpectedAppendOffsetException
-import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.log.{LogConfig, LogManager, CleanerConfig}
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils, MockScheduler}
-import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.metrics.Metrics
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 88aebd3..660e623 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -16,8 +16,8 @@
  */
 package kafka.coordinator.transaction
 
-import kafka.common.KafkaException
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.KafkaException
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.{After, Test}
 import org.junit.Assert._
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 2c4a988..0c97357 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -20,7 +20,6 @@ package kafka.integration
 import java.io.File
 import java.util.Arrays
 
-import kafka.common.KafkaException
 import kafka.server._
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
@@ -30,6 +29,7 @@ import org.junit.{After, Before}
 import scala.collection.mutable.{ArrayBuffer, Buffer}
 import java.util.Properties
 
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Time
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 3fc6c1c..38d6f71 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,10 +20,9 @@ package kafka.log
 import java.io._
 import java.util.Properties
 
-import kafka.common._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f3b4e95..3b5b2fa 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,12 +22,12 @@ import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
-import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException}
+import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 1e4e892..1529597 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -29,7 +29,7 @@ import org.scalatest.junit.JUnitSuite
 import scala.collection._
 import scala.util.Random
 import kafka.utils.TestUtils
-import kafka.common.InvalidOffsetException
+import org.apache.kafka.common.errors.InvalidOffsetException
 
 class OffsetIndexTest extends JUnitSuite {
   
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index 8520f89..b9478cd 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -19,10 +19,10 @@ package kafka.log
 
 import java.io.File
 
-import kafka.common.InvalidOffsetException
 import kafka.utils.TestUtils
-import org.junit.{Test, After, Before}
-import org.junit.Assert.{assertEquals}
+import org.apache.kafka.common.errors.InvalidOffsetException
+import org.junit.{After, Before, Test}
+import org.junit.Assert.assertEquals
 import org.scalatest.junit.JUnitSuite
 
 /**
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 64647de..67d083c 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,9 +17,9 @@
 
 package kafka.server
 
-import kafka.common.KafkaException
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.KafkaException
 import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.easymock.EasyMock
 import org.junit.Assert._
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 7c416a2..93578c6 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -26,8 +26,8 @@ import java.util.regex.Pattern
 
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
-import kafka.common.KafkaException
 import kafka.utils.CoreUtils.inLock
+import org.apache.kafka.common.KafkaException
 import org.junit.Test
 import org.apache.kafka.common.utils.Utils
 import org.slf4j.event.Level