You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/22 21:14:56 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #26158: [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source

HeartSaVioR commented on a change in pull request #26158: [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source
URL: https://github.com/apache/spark/pull/26158#discussion_r337753823
 
 

 ##########
 File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ##########
 @@ -113,49 +113,43 @@ private[kafka010] abstract class KafkaRowWriter(
   }
 
   private def createProjection = {
+    def expression(attrName: String)(defaultFn: () => Expression): Expression = {
+      inputSchema.find(_.name == attrName).getOrElse(defaultFn())
+    }
+
+    def assertDataType(attrName: String, desired: Seq[DataType], actual: DataType): Unit = {
+      if (!desired.exists(_.sameType(actual))) {
+        throw new IllegalStateException(s"$attrName attribute unsupported type " +
+          s"${actual.catalogString}")
+      }
+    }
+
     val topicExpression = topic.map(Literal(_)).orElse {
 
 Review comment:
   Yeah I thought it would be complicated or require more change on expression but turned out it's not. I'll make a change like below:
   
   ```
       val topicExpression = topic.map(Literal(_)).getOrElse(
         expression(KafkaWriter.TOPIC_ATTRIBUTE_NAME) { () =>
           throw new IllegalStateException(s"topic option required when no " +
             s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
         }
       )
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org