You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/05 05:28:23 UTC

[GitHub] [hudi] umehrot2 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

umehrot2 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r626252334



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.{DataSourceWriteOptions, SparkSqlAdapterSupport}
+import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, OPERATION_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+
+case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand
+  with SparkSqlAdapterSupport {
+
+  private val table = deleteTable.table
+
+  private val tableId = table match {
+    case SubqueryAlias(name, _) => sparkSqlAdapter.toTableIdentify(name)
+    case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    logInfo(s"start execute delete command for $tableId")
+
+    // Remove meta fields from the data frame
+    var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
+    if (deleteTable.condition.isDefined) {
+      df = df.filter(Column(deleteTable.condition.get))
+    }

Review comment:
       Can you confirm, will this scan also happen via `Hudi datasource` because it needs to be able to filter out latest rows right ? 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SparkSqlAdapter.scala
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.client.utils.SparkRowSerDe
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * An interface to adapter the difference between spark2 and spark3
+  * in some spark sql related class.
+  */
+trait SparkSqlAdapter extends Serializable {
+
+  def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe

Review comment:
       Thanks for this refactoring for better segregation between Spark 2 and Spark 3. Can you add javadoc for each of these API ?

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+
+
+/**
+ * The HoodieOptionConfig defines some short name for the hoodie
+ * option key and value.
+ */
+object HoodieOptionConfig {
+
+  /**
+   * The short name for the value of COW_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_COW = "cow"
+
+  /**
+   * The short name for the value of MOR_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_MOR = "mor"
+
+
+  val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
+    .withSqlKey("primaryKey")
+    .withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
+    .withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS)
+    .build()
+
+  val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
+    .withSqlKey("type")
+    .withHoodieKey(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY)
+    .withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME)
+    .defaultValue(SQL_VALUE_TABLE_TYPE_COW)
+    .build()
+
+  val SQL_KEY_VERSION_COLUMN: HoodieOption[String] = buildConf()
+    .withSqlKey("preCombineField")
+    .withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY)
+    .withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)
+    .build()
+
+  val SQL_PAYLOAD_CLASS: HoodieOption[String] = buildConf()
+    .withSqlKey("payloadClass")
+    .withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY)
+    .withTableConfigKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)
+    .defaultValue(classOf[DefaultHoodieRecordPayload].getName)
+    .build()
+
+  /**
+   * The mapping of the sql short name key to the hoodie's config key.
+   */
+  private lazy val keyMapping: Map[String, String] = {
+    HoodieOptionConfig.getClass.getDeclaredFields
+        .filter(f => f.getType == classOf[HoodieOption[_]])
+        .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})

Review comment:
       We should avoid using reflection as much as possible. There should be a way to do this without using reflection.

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SparkSqlAdapter.scala
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.client.utils.SparkRowSerDe
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * An interface to adapter the difference between spark2 and spark3
+  * in some spark sql related class.
+  */
+trait SparkSqlAdapter extends Serializable {

Review comment:
       Shall we just call it `SparkAdapter` as it has some functions that are used by both Spark DataSource and Spark SQL.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.{DataSourceWriteOptions, SparkSqlAdapterSupport}
+import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, OPERATION_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+
+case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand
+  with SparkSqlAdapterSupport {
+
+  private val table = deleteTable.table
+
+  private val tableId = table match {
+    case SubqueryAlias(name, _) => sparkSqlAdapter.toTableIdentify(name)
+    case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    logInfo(s"start execute delete command for $tableId")
+
+    // Remove meta fields from the data frame
+    var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
+    if (deleteTable.condition.isDefined) {
+      df = df.filter(Column(deleteTable.condition.get))
+    }
+    val config = buildHoodieConfig(sparkSession)
+    df.write
+      .format("hudi")
+      .mode(SaveMode.Append)
+      .options(config)
+      .save()
+    sparkSession.catalog.refreshTable(tableId.unquotedString)
+    logInfo(s"Finish execute delete command for $tableId")
+    Seq.empty[Row]
+  }
+
+  private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
+    val targetTable = sparkSession.sessionState.catalog
+      .getTableMetadata(tableId)
+    val path = getTableLocation(targetTable, sparkSession)
+      .getOrElse(s"missing location for $tableId")
+
+    val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
+
+    assert(primaryColumns.nonEmpty,
+      s"There are no primary key in table $tableId, cannot execute delete operator")
+
+    withSparkConf(sparkSession, targetTable.storage.properties) {
+      Map(
+        "path" -> path.toString,
+        RECORDKEY_FIELD_OPT_KEY -> primaryColumns.mkString(","),
+        KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getCanonicalName,

Review comment:
       - What's the reason for again passing these configs here, if they are expected to be present in catalog and will be read from there  ? Is it to be safe in case it doesn't come from catalog for some reason ? Or may be it is to work with existing tables that don't have these properties registered in catalog ?
   
   - Why are we using `ComplexKeyGenerator` by default ? You can check if there are more than one primary columns. If yes use `complex` else use `simple`.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+
+
+/**
+ * The HoodieOptionConfig defines some short name for the hoodie
+ * option key and value.
+ */
+object HoodieOptionConfig {
+
+  /**
+   * The short name for the value of COW_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_COW = "cow"
+
+  /**
+   * The short name for the value of MOR_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_MOR = "mor"
+
+
+  val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
+    .withSqlKey("primaryKey")

Review comment:
       I assume you are using this property name to sync for catalog table properties ? We should prepend all these with `hudi` keyword so that users can easily know which are hudi related. AWS Glue console for example displays the properties and it would be more intuitive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org