You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/06 18:33:21 UTC

[GitHub] [hudi] garyli1019 commented on a change in pull request #1722: [HUDI-69] Support Spark Datasource for MOR table

garyli1019 commented on a change in pull request #1722:
URL: https://github.com/apache/hudi/pull/1722#discussion_r450390920



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -58,26 +60,28 @@ class DefaultSource extends RelationProvider
       throw new HoodieException("'path' must be specified.")
     }
 
+    // Try to create hoodie table meta client from the give path
+    // TODO: Smarter path handling
+    val metaClient = try {
+      val conf = sqlContext.sparkContext.hadoopConfiguration
+      Option(new HoodieTableMetaClient(conf, path.get, true))

Review comment:
       At this point we have:
   - RO, Snapshot query for COW: Support glob and basePath
   - Snapshot for MOR: only support basePath
   - Incremental: Only support basePath
   
   What I am trying to do here is:
   - If the `path` contains glob, fall back to RO. This is the current behavior. Create metaClient will throw an Exception but handled below.
   - If the `path` is basePath, we create the metaClient. If COW table, go RO relation. If MOR, go snapshot relation.

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -123,4 +127,25 @@ class DefaultSource extends RelationProvider
   }
 
   override def shortName(): String = "hudi"
+
+  private def getReadOptimizedView(sqlContext: SQLContext,

Review comment:
       sure, will do

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -58,26 +60,28 @@ class DefaultSource extends RelationProvider
       throw new HoodieException("'path' must be specified.")
     }
 
+    // Try to create hoodie table meta client from the give path
+    // TODO: Smarter path handling
+    val metaClient = try {
+      val conf = sqlContext.sparkContext.hadoopConfiguration
+      Option(new HoodieTableMetaClient(conf, path.get, true))
+    } catch {
+      case e: HoodieException => Option.empty

Review comment:
       I used this as a flag that the `path` is not basePath. This is a temporary solution to not change the query behavior.
   This will be handled better with: https://github.com/apache/hudi/pull/1702/files#diff-9a21766ebf794414f94b302bcb968f41R31

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -65,7 +66,7 @@ object DataSourceReadOptions {
     * This eases migration from old configs to new configs.
     */
   def translateViewTypesToQueryTypes(optParams: Map[String, String]) : Map[String, String] = {
-    val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
+    val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,

Review comment:
       Sorry, my previous comments are confusing, let me rephrase.
   What I trying to do here is to not change the query behavior. Since before we don't support snapshot query for MOR, so RO and snapshot query type will behave the same regardless of its COW or MOR. 
   If we don't change this mapping, the user will have different behavior after upgrade to the next release. If they are using `VIEW_TYPE_READ_OPTIMIZED_OPT_VAL(deprecated)` on MOR in their code, after upgrade to the next release, the code will run snapshot query instead of RO query. This could give users surprise even this key was deprecated.

##########
File path: hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRealtimeFileFormat.scala
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to read Hudi MOR table.
+ */
+class HoodieParquetRealtimeFileFormat extends ParquetFileFormat {

Review comment:
       If we use the `FileFormat` approach, we probably can't avoid copy some Spark code. For datasource V2, we need to copy more code since Spark 3 use `case class` for all the `FileFormat`
   I will try to use udit's `RDD` approach https://github.com/apache/hudi/pull/1702/files#diff-809772c649e85ffb321055d9871e37e0R39
   I think that's doable. In that approach, we can get rid of this, but need to try that after his PR merged. Will need to reuse many code from that PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org