You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/02 19:42:24 UTC
[kafka] branch trunk updated: KAFKA-6955: Use Java AdminClient in
DeleteRecordsCommand (#5088)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 341d5db KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)
341d5db is described below
commit 341d5db2604f20a8bd2a69e9ad08d14462829068
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Sat Jun 2 12:41:55 2018 -0700
KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)
- Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's
no longer used.
- Removed redundant tests and rewrote non redundant ones to use the Java
AdminClient.
Reviewers: Viktor Somogyi <vi...@cloudera.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/admin/AdminClient.scala | 70 ------------
.../scala/kafka/admin/DeleteRecordsCommand.scala | 28 +++--
.../kafka/api/AdminClientIntegrationTest.scala | 81 +++++++++++---
.../kafka/api/LegacyAdminClientTest.scala | 121 +--------------------
docs/upgrade.html | 1 +
5 files changed, 87 insertions(+), 214 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index bcc11fd..ea42530 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -18,7 +18,6 @@ import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
-import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging
@@ -216,73 +215,6 @@ class AdminClient(val time: Time,
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
}.toMap
- /*
- * Remove all the messages whose offset is smaller than the given offset of the corresponding partition
- *
- * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
- * and their interpretations below:
- *
- * - DisconnectException if leader node of the partition is not available. Need retry by user.
- * - PolicyViolationException if the topic is configured as non-deletable.
- * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
- * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
- * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
- * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
- * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
- *
- */
-
- def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
- val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
- val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
- val errors = response.errors
- if (!errors.isEmpty)
- error(s"Metadata request contained errors: $errors")
-
- val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
- !response.errors().containsKey(partitionAndOffset._1.topic())}
-
- val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
- response.cluster().leaderFor(partitionAndOffset._1) != null}
-
- val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
- partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
-
- val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
- DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
-
- val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
- response.cluster().leaderFor(partitionAndOffset._1))
-
- // prepare requests and generate Future objects
- val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
- val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
- val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
- pendingFutures.add(future)
- future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
- override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
- val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
- val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
- future.complete(result)
- pendingFutures.remove(future)
- }
-
- override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
- val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
- future.complete(result)
- pendingFutures.remove(future)
- }
-
- })
- }
-
- // default output if not receiving DeleteRecordsResponse before timeout
- val defaultResults = offsets.mapValues(_ =>
- DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
-
- new CompositeFuture(time, defaultResults, futures.toList)
- }
-
/**
* Case class used to represent a consumer of a consumer group
*/
@@ -473,8 +405,6 @@ object AdminClient {
config
}
- case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
-
class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def createSimplePlaintext(brokerUrl: String): AdminClient = {
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 2715490..14d38ec 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -20,14 +20,17 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
-import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.AdminCommandFailedException
-import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, CoreUtils, Json}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.admin
+import org.apache.kafka.clients.admin.RecordsToDelete
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
+import scala.collection.JavaConverters._
+
/**
* A command for delete records of the given partitions down to the specified offset.
*/
@@ -61,26 +64,31 @@ object DeleteRecordsCommand {
if (duplicatePartitions.nonEmpty)
throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
+ val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
+ (topicPartition, RecordsToDelete.beforeOffset(offset))
+ }.toMap.asJava
+
out.println("Executing records delete operation")
- val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+ val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
out.println("Records delete operation completed:")
- deleteRecordsResult.foreach{ case (tp, partitionResult) => {
- if (partitionResult.error == null)
- out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
- else
- out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
+ deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => {
+ try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}")
+ catch {
+ case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}")
+ }
}}
+
adminClient.close()
}
- private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
+ private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.AdminClient = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
- AdminClient.create(props)
+ admin.AdminClient.create(props)
}
class DeleteRecordsCommandOptions(args: Array[String]) {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 231b1e7..5e4b893 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,8 +49,6 @@ import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.internals.Topic
-import org.scalatest.Assertions.intercept
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@@ -291,8 +289,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
)
}
}
-
- client.close()
}
@Test
@@ -746,7 +742,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(0L, consumer.position(topicPartition))
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
- val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+ val lowWatermark = result.lowWatermarks().get(topicPartition).get.lowWatermark
assertEquals(5L, lowWatermark)
consumer.seekToBeginning(Collections.singletonList(topicPartition))
@@ -755,7 +751,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
consumer.seek(topicPartition, 7L)
assertEquals(7L, consumer.position(topicPartition))
- client.close()
+ client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get
+ consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ assertEquals(10L, consumer.position(topicPartition))
}
@Test
@@ -794,7 +792,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
}
}, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
- client.close()
}
@Test
@@ -807,13 +804,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
sendRecords(producers.head, 10, topicPartition)
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
- val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+ val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
assertEquals(3L, lowWatermark)
for (i <- 0 until serverCount)
assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
-
- client.close()
}
@Test
@@ -829,14 +824,68 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
- result.all().get()
+ result.all.get
assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava)
- result.all().get()
+ result.all.get
assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition))
+ }
- client.close()
+ @Test
+ def testConsumeAfterDeleteRecords(): Unit = {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ client = AdminClient.create(createConfig)
+
+ sendRecords(producers.head, 10, topicPartition)
+ var messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count
+ messageCount == 10
+ }, "Expected 10 messages", 3000L)
+
+ client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get
+ consumer.seek(topicPartition, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count
+ messageCount == 7
+ }, "Expected 7 messages", 3000L)
+
+ client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get
+ consumer.seek(topicPartition, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count
+ messageCount == 2
+ }, "Expected 2 messages", 3000L)
+ }
+
+ @Test
+ def testDeleteRecordsWithException(): Unit = {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ client = AdminClient.create(createConfig)
+
+ sendRecords(producers.head, 10, topicPartition)
+
+ assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
+ .lowWatermarks.get(topicPartition).get.lowWatermark)
+
+ // OffsetOutOfRangeException if offset > high_watermark
+ var cause = intercept[ExecutionException] {
+ client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get
+ }.getCause
+ assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
+
+ val nonExistPartition = new TopicPartition(topic, 3)
+ // LeaderNotAvailableException if non existent partition
+ cause = intercept[ExecutionException] {
+ client.deleteRecords(Map(nonExistPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get
+ }.getCause
+ assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
}
@Test
@@ -856,8 +905,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic))
assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
-
- client.close()
}
private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
@@ -902,7 +949,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
classOf[SecurityDisabledException])
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
classOf[SecurityDisabledException])
- client.close()
}
/**
@@ -955,7 +1001,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
val endTimeMs = Time.SYSTEM.milliseconds()
assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
- client.close()
}
/**
@@ -973,7 +1018,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().validateOnly(true)).all()
future2.get
- client.close()
assertEquals(1, factory.failuresInjected)
}
@@ -1091,6 +1135,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
Utils.closeQuietly(client, "adminClient")
}
}
+
}
object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 2f6fa01..b78946c 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -17,20 +17,19 @@
package kafka.api
import java.util.Collections
-import java.util.concurrent.TimeUnit
import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.server.KafkaConfig
import java.lang.{Long => JLong}
+
import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Test}
import org.junit.Assert._
+
import scala.collection.JavaConverters._
/**
@@ -79,122 +78,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
}
@Test
- def testSeekToBeginningAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(0L, consumer.position(tp))
-
- client.deleteRecordsBefore(Map((tp, 5L))).get()
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(5L, consumer.position(tp))
-
- client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
- consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(10L, consumer.position(tp))
- }
-
- @Test
- def testConsumeAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- var messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 10
- }, "Expected 10 messages", 3000L)
-
- client.deleteRecordsBefore(Map((tp, 3L))).get()
- consumer.seek(tp, 1)
- messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 7
- }, "Expected 7 messages", 3000L)
-
- client.deleteRecordsBefore(Map((tp, 8L))).get()
- consumer.seek(tp, 1)
- messageCount = 0
- TestUtils.waitUntilTrue(() => {
- messageCount += consumer.poll(0).count()
- messageCount == 2
- }, "Expected 2 messages", 3000L)
- }
-
- @Test
- def testLogStartOffsetCheckpoint() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
- for (i <- 0 until serverCount)
- killBroker(i)
- restartDeadBrokers()
-
- client.close()
- brokerList = TestUtils.bootstrapServers(servers, listenerName)
- client = AdminClient.createSimplePlaintext(brokerList)
-
- TestUtils.waitUntilTrue(() => {
- // Need to retry if leader is not available for the partition
- client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
- }, "Expected low watermark of the partition to be 5L")
- }
-
- @Test
- def testLogStartOffsetAfterDeleteRecords() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- client.deleteRecordsBefore(Map((tp, 3L))).get()
-
- for (i <- 0 until serverCount)
- assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
- }
-
- @Test
def testOffsetsForTimesWhenOffsetNotFound() {
val consumer = consumers.head
assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
}
@Test
- def testOffsetsForTimesAfterDeleteRecords() {
- val consumer = consumers.head
- subscribeAndWaitForAssignment(topic, consumer)
-
- sendRecords(producers.head, 10, tp)
- assertEquals(0L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
-
- client.deleteRecordsBefore(Map((tp, 5L))).get()
- assertEquals(5L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
-
- client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
- assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
- }
-
- @Test
- def testDeleteRecordsWithException() {
- subscribeAndWaitForAssignment(topic, consumers.head)
-
- sendRecords(producers.head, 10, tp)
- // Should get success result
- assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
- // OffsetOutOfRangeException if offset > high_watermark
- assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
-
- val nonExistPartition = new TopicPartition(topic, 3)
- // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
- assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
- client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
- }
-
- @Test
def testListGroups() {
subscribeAndWaitForAssignment(topic, consumers.head)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ba2d930..03d1feb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -95,6 +95,7 @@
timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
will be removed in a future version.</li>
+ <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.