You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by GitBox <gi...@apache.org> on 2019/03/23 08:30:04 UTC

[GitHub] [samza] shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r268384606
 
 

 ##########
 File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws StreamValidationExcepti
   @Override
   public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
     if (deleteCommittedMessages) {
-      if (adminClientForDelete == null) {
-        adminClientForDelete = kafka.admin.AdminClient.create(createAdminClientProperties());
-      }
-      KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
+      Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
+          .stream()
+          .collect(Collectors.toMap(entry ->
+              new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
+              entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+      adminClient.deleteRecords(recordsToDelete);
 
 Review comment:
   1. Github does not allow me to comment on `createAdminClientProperties` API in this class(Line number: 629 in this file).  Since we're moving to new [AdminClient](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html), we can remove some configurations like zookeeper.servers which are not supported by new admin-client.  Here's the list of valid [configurations](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClientConfig.html) accepted by new AdminClient.
   2. Minor: It would be better to rely upon static constants exposed by `AdminClientConfig` rather than `ConsumerConfig` to create admin-client. Ex: Switch from `ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG` to `AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG`.
   3. There're some configurations like request.backoff.ms, retry.backoff.ms configurable for the AdminClient, would be better to find if we want to allow the user to override them(or we want to increase/decrease the defaults).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services