You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/02 19:42:24 UTC

[kafka] branch trunk updated: KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 341d5db  KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)
341d5db is described below

commit 341d5db2604f20a8bd2a69e9ad08d14462829068
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Sat Jun 2 12:41:55 2018 -0700

    KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)
    
    - Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's
    no longer used.
    - Removed redundant tests and rewrote non redundant ones to use the Java
    AdminClient.
    
    Reviewers: Viktor Somogyi <vi...@cloudera.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/admin/AdminClient.scala  |  70 ------------
 .../scala/kafka/admin/DeleteRecordsCommand.scala   |  28 +++--
 .../kafka/api/AdminClientIntegrationTest.scala     |  81 +++++++++++---
 .../kafka/api/LegacyAdminClientTest.scala          | 121 +--------------------
 docs/upgrade.html                                  |   1 +
 5 files changed, 87 insertions(+), 214 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index bcc11fd..ea42530 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -18,7 +18,6 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
 
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
 import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
@@ -216,73 +215,6 @@ class AdminClient(val time: Time,
       broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
     }.toMap
 
-  /*
-   * Remove all the messages whose offset is smaller than the given offset of the corresponding partition
-   *
-   * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
-   * and their interpretations below:
-   *
-   * - DisconnectException if leader node of the partition is not available. Need retry by user.
-   * - PolicyViolationException if the topic is configured as non-deletable.
-   * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
-   * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
-   * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
-   * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
-   * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
-   *
-   */
-
-  def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
-    val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
-    val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
-    val errors = response.errors
-    if (!errors.isEmpty)
-      error(s"Metadata request contained errors: $errors")
-
-    val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
-      !response.errors().containsKey(partitionAndOffset._1.topic())}
-
-    val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
-      response.cluster().leaderFor(partitionAndOffset._1) != null}
-
-    val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
-      partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
-
-    val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
-      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
-
-    val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
-      response.cluster().leaderFor(partitionAndOffset._1))
-
-    // prepare requests and generate Future objects
-    val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
-      val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
-      val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
-      pendingFutures.add(future)
-      future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
-          override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
-            val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
-            val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
-            future.complete(result)
-            pendingFutures.remove(future)
-          }
-
-          override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
-            val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
-            future.complete(result)
-            pendingFutures.remove(future)
-          }
-
-        })
-    }
-
-    // default output if not receiving DeleteRecordsResponse before timeout
-    val defaultResults = offsets.mapValues(_ =>
-      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
-
-    new CompositeFuture(time, defaultResults, futures.toList)
-  }
-
   /**
    * Case class used to represent a consumer of a consumer group
    */
@@ -473,8 +405,6 @@ object AdminClient {
     config
   }
 
-  case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
-
   class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
 
   def createSimplePlaintext(brokerUrl: String): AdminClient = {
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 2715490..14d38ec 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -20,14 +20,17 @@ package kafka.admin
 import java.io.PrintStream
 import java.util.Properties
 
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.AdminCommandFailedException
-import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, CoreUtils, Json}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.admin
+import org.apache.kafka.clients.admin.RecordsToDelete
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
 
+import scala.collection.JavaConverters._
+
 /**
  * A command for delete records of the given partitions down to the specified offset.
  */
@@ -61,26 +64,31 @@ object DeleteRecordsCommand {
     if (duplicatePartitions.nonEmpty)
       throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
 
+    val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
+      (topicPartition, RecordsToDelete.beforeOffset(offset))
+    }.toMap.asJava
+
     out.println("Executing records delete operation")
-    val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+    val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
     out.println("Records delete operation completed:")
 
-    deleteRecordsResult.foreach{ case (tp, partitionResult) => {
-      if (partitionResult.error == null)
-        out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
-      else
-        out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
+    deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => {
+      try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}")
+      catch {
+        case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}")
+      }
     }}
+
     adminClient.close()
   }
 
-  private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
+  private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.AdminClient = {
     val props = if (opts.options.has(opts.commandConfigOpt))
       Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     else
       new Properties()
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
-    AdminClient.create(props)
+    admin.AdminClient.create(props)
   }
 
   class DeleteRecordsCommandOptions(args: Array[String]) {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 231b1e7..5e4b893 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,8 +49,6 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.internals.Topic
-import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
@@ -291,8 +289,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
         )
       }
     }
-
-    client.close()
   }
 
   @Test
@@ -746,7 +742,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(0L, consumer.position(topicPartition))
 
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
-    val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    val lowWatermark = result.lowWatermarks().get(topicPartition).get.lowWatermark
     assertEquals(5L, lowWatermark)
 
     consumer.seekToBeginning(Collections.singletonList(topicPartition))
@@ -755,7 +751,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     consumer.seek(topicPartition, 7L)
     assertEquals(7L, consumer.position(topicPartition))
 
-    client.close()
+    client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get
+    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    assertEquals(10L, consumer.position(topicPartition))
   }
 
   @Test
@@ -794,7 +792,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
           e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
         }
     }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
-    client.close()
   }
 
   @Test
@@ -807,13 +804,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     sendRecords(producers.head, 10, topicPartition)
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
-    val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
     assertEquals(3L, lowWatermark)
 
     for (i <- 0 until serverCount)
       assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
-
-    client.close()
   }
 
   @Test
@@ -829,14 +824,68 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
 
     var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
-    result.all().get()
+    result.all.get
     assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
 
     result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava)
-    result.all().get()
+    result.all.get
     assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition))
+  }
 
-    client.close()
+  @Test
+  def testConsumeAfterDeleteRecords(): Unit = {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    client = AdminClient.create(createConfig)
+
+    sendRecords(producers.head, 10, topicPartition)
+    var messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get
+    consumer.seek(topicPartition, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get
+    consumer.seek(topicPartition, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testDeleteRecordsWithException(): Unit = {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    client = AdminClient.create(createConfig)
+
+    sendRecords(producers.head, 10, topicPartition)
+
+    assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
+      .lowWatermarks.get(topicPartition).get.lowWatermark)
+
+    // OffsetOutOfRangeException if offset > high_watermark
+    var cause = intercept[ExecutionException] {
+      client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get
+    }.getCause
+    assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // LeaderNotAvailableException if non existent partition
+    cause = intercept[ExecutionException] {
+      client.deleteRecords(Map(nonExistPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get
+    }.getCause
+    assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
   }
 
   @Test
@@ -856,8 +905,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic))
 
     assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
-
-    client.close()
   }
 
   private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
@@ -902,7 +949,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
         classOf[SecurityDisabledException])
     assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
       classOf[SecurityDisabledException])
-    client.close()
   }
 
   /**
@@ -955,7 +1001,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
     val endTimeMs = Time.SYSTEM.milliseconds()
     assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
-    client.close()
   }
 
   /**
@@ -973,7 +1018,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava,
       new CreateTopicsOptions().validateOnly(true)).all()
     future2.get
-    client.close()
     assertEquals(1, factory.failuresInjected)
   }
 
@@ -1091,6 +1135,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       Utils.closeQuietly(client, "adminClient")
     }
   }
+
 }
 
 object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 2f6fa01..b78946c 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -17,20 +17,19 @@
 package kafka.api
 
 import java.util.Collections
-import java.util.concurrent.TimeUnit
 
 import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.server.KafkaConfig
 import java.lang.{Long => JLong}
+
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
+
 import scala.collection.JavaConverters._
 
 /**
@@ -79,122 +78,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
-  def testSeekToBeginningAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(0L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(5L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(10L, consumer.position(tp))
-  }
-
-  @Test
-  def testConsumeAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    var messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 10
-    }, "Expected 10 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 7
-    }, "Expected 7 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 8L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 2
-    }, "Expected 2 messages", 3000L)
-  }
-
-  @Test
-  def testLogStartOffsetCheckpoint() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
-    for (i <- 0 until serverCount)
-      killBroker(i)
-    restartDeadBrokers()
-
-    client.close()
-    brokerList = TestUtils.bootstrapServers(servers, listenerName)
-    client = AdminClient.createSimplePlaintext(brokerList)
-
-    TestUtils.waitUntilTrue(() => {
-      // Need to retry if leader is not available for the partition
-      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
-    }, "Expected low watermark of the partition to be 5L")
-  }
-
-  @Test
-  def testLogStartOffsetAfterDeleteRecords() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-
-    for (i <- 0 until serverCount)
-      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
-  }
-
-  @Test
   def testOffsetsForTimesWhenOffsetNotFound() {
     val consumer = consumers.head
     assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
   }
 
   @Test
-  def testOffsetsForTimesAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
-  }
-
-  @Test
-  def testDeleteRecordsWithException() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    // Should get success result
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-    // OffsetOutOfRangeException if offset > high_watermark
-    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
-
-    val nonExistPartition = new TopicPartition(topic, 3)
-    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
-    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
-                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
-  }
-
-  @Test
   def testListGroups() {
     subscribeAndWaitForAssignment(topic, consumers.head)
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ba2d930..03d1feb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -95,6 +95,7 @@
         timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
         does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
         will be removed in a future version.</li>
+    <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.