You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/16 12:43:35 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

gaborgsomogyi commented on a change in pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer
URL: https://github.com/apache/spark/pull/22138#discussion_r314703946
 
 

 ##########
 File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##########
 @@ -18,228 +18,253 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaConfigUpdater
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from multiple callers,
+ * so all the methods should not rely on current cursor and use seek manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+    val topicPartition: TopicPartition,
+    val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging {
+
+  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's `isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to fetch next available record
-   * within [offset, untilOffset).
+   * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
    *
-   * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
-   *
-   * @param offset         the offset to fetch.
-   * @param untilOffset    the max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
-   *                       offset if available, or throw exception.when `failOnDataLoss` is `false`,
-   *                       this method will either return record at offset if available, or return
-   *                       the next earliest available record less than untilOffset, or null. It
-   *                       will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed after polling. It means the
+   *                          consumer polls nothing before timeout.
    */
-  def get(
-      offset: Long,
-      untilOffset: Long,
-      pollTimeoutMs: Long,
-      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
-    internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+    // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
+    seek(offset)
+    val p = consumer.poll(pollTimeoutMs)
 
 Review comment:
   The PR needed a different approach which requires new Kafka API. Because of this the PR closed and can be done only after new API is available so this comment can be ignored.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org