You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/02/09 11:25:35 UTC

[GitHub] [drill] rymarm opened a new pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

rymarm opened a new pull request #2456:
URL: https://github.com/apache/drill/pull/2456


   # [DRILL-8122](https://issues.apache.org/jira/browse/DRILL-8122): Change kafka metadata obtaining due to KAFKA-5697
   
   ## Description
   
   [`Consumer#poll(long)`](https://javadoc.io/static/org.apache.kafka/kafka-clients/3.1.0/org/apache/kafka/clients/consumer/Consumer.html#poll-long-) is deprecated starting from kafka 2.0. In Drill, `Consumer#poll` is used in 2 places:
   1. [By its direct purpose
   ](https://github.com/apache/drill/blob/15b2f52260e4f0026f2dfafa23c5d32e0fb66502/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java#L82)
   2. As an only one way to make a Kafka consumer [update metadata
   ](https://github.com/apache/drill/blob/15b2f52260e4f0026f2dfafa23c5d32e0fb66502/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java#L185)
   
   Kafka [hasn't implemented](https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer) a separate method to update metadata. And new implementation [Consumer#poll(Duration)](https://javadoc.io/static/org.apache.kafka/kafka-clients/3.1.0/org/apache/kafka/clients/consumer/Consumer.html#poll-java.time.Duration-) doesn't work with a hack that Drill use: `poll(0)`, due to changed logic: https://github.com/apache/kafka/pull/4855 . That is why I had to use a loop with a timeout to workaround the absent separate method. 
   ## Documentation
   \-
   
   ## Testing
   Unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] rymarm commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
rymarm commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r806118920



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       Why do you prefer to remove the direct link? I think it's a very convenient thing and it's fast to check whether it was resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] rymarm commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
rymarm commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r806624404



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       @luocooong Thank you for explaining! Followed your suggestion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r803786802



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.
+   * It can be replaced with Kafka implementation once it will be introduced.
+   * @param consumer Kafka consumer whom need to get assignments
+   * @return
+   * @throws InterruptedException
+   */
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) throws InterruptedException {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      Thread.sleep(500);
+      timeout += 500;
+      assignments = consumer.assignment();
+    }
+
+    if (timeout >= waitingForAssigmentTimeout) {
+      logger.error("Consumer assignment wasn't completed within the timeout {}", waitingForAssigmentTimeout);
+      throw UserException.dataReadError().build(logger);

Review comment:
       ```
   throw UserException.dataReadError()
     .message("Consumer assignment wasn't completed within the timeout %s", waitingForAssigmentTimeout)
     .build(logger);
   ```

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       ```suggestion
     /**
     Workaround for Kafka > 2.0 version due to KIP-505. It can be replaced with Kafka implementation once it will be introduced.
   ```

##########
File path: contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
##########
@@ -156,6 +157,29 @@ public void testInformationSchema() throws Exception {
     }
   }
 
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer)  {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = 5000;
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();

Review comment:
       Printing stack information is not recommended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r806362857



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       @rymarm Thanks for the question.
   In general, we can record any information of limitations into the README.md of plugins, such as `contrib/format-xml/README.md`. Then, we can keep the comment as brief as possible, and here we focus on providing an alternative function. So the KIP-505 keyword is enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] rymarm commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
rymarm commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r806118920



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       Why do you prefer to remove the direct link? I think it's a very convenient thing and it's fast to check whether it was resolved.

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       @luocooong Thank you for explaining! Followed your suggestion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r806362857



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.

Review comment:
       @rymarm Thanks for the question.
   In general, we can record any information of limitations into the README.md of plugins, such as `contrib/format-xml/README.md`. Then, we can keep the comment as brief as possible, and here we focus on providing an alternative function. So the KIP-505 keyword is enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] jnturton merged pull request #2456: DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697

Posted by GitBox <gi...@apache.org>.
jnturton merged pull request #2456:
URL: https://github.com/apache/drill/pull/2456


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org