You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/30 03:08:25 UTC

[GitHub] [spark] wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#discussion_r319342797
 
 

 ##########
 File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala
 ##########
 @@ -18,16 +18,160 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.cache._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.util.Utils
+
+/**
+ * A [[WriterCommitMessage]] for Kafka commit message.
+ * @param transactionalId Unique transactionalId for each producer.
+ * @param epoch Transactional epoch.
+ * @param producerId Transactional producerId for producer, got when init transaction.
+ */
+private[kafka010] case class ProducerTransactionMetaData(
+    transactionalId: String,
+    epoch: Short,
+    producerId: Long)
+  extends WriterCommitMessage
+
+/**
+ * Emtpy commit message for resume transaction.
+ */
+private case object EmptyCommitMessage extends WriterCommitMessage
+
+private[kafka010] case object ProducerTransactionMetaData {
+  val VERSION = 1
+
+  def toTransactionId(
+      executorId: String,
+      taskIndex: String,
+      transactionalIdSuffix: String): String = {
+    toTransactionId(toProducerIdentity(executorId, taskIndex), transactionalIdSuffix)
+  }
+
+  def toTransactionId(producerIdentity: String, transactionalIdSuffix: String): String = {
+    s"$producerIdentity||$transactionalIdSuffix"
+  }
+
+  def toTransactionalIdSuffix(transactionalId: String): String = {
+    transactionalId.split("\\|\\|", 2)(1)
+  }
+
+  def toProducerIdentity(transactionalId: String): String = {
+    transactionalId.split("\\|\\|", 2)(0)
+  }
+
+  def toExecutorId(transactionalId: String): String = {
+    val producerIdentity = toProducerIdentity(transactionalId)
+    producerIdentity.split("-", 2)(0)
+  }
+
+  def toTaskIndex(transactionalId: String): String = {
+    val producerIdentity = toProducerIdentity(transactionalId)
+    producerIdentity.split("-", 2)(1)
+  }
+
+  def toProducerIdentity(executorId: String, taskIndex: String): String = {
+    s"$executorId-$taskIndex"
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka transactional writing. One data writer will be created
+ * in each partition to process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
+ *                    from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+private[kafka010] class KafkaTransactionDataWriter(
+    targetTopic: Option[String],
+    producerParams: ju.Map[String, Object],
+    inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
+
+  private lazy val producer = {
+    val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams)
 
 Review comment:
   I think caching logic is ok and we can control producer creation per task, and also failover with transactional.id in producerParams.
   Transaction producer is not thread safe, so what I do is one producer per task in one micro-batch, and in next batch reused the created producer instead of recreate one since transaction is complete in every micro-batch.  With producerParams, transactional.id is different between tasks in one micro-batch, but same in the next micro-batch.
   And if task number is same for every executor in every micro-batch, no more producer will be created except the first micro-batch. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org