You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/25 09:26:32 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6727: [HUDI-3478] Implement CDC Read in Spark

xushiyan commented on code in PR #6727:
URL: https://github.com/apache/hudi/pull/6727#discussion_r979378447


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/EmptyRelation.scala:
##########
@@ -19,31 +19,17 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
 
-import scala.util.control.NonFatal
-
 /**
  * BaseRelation representing empty RDD.
  * @param sqlContext instance of SqlContext.
  */
-class EmptyRelation(val sqlContext: SQLContext, metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {
-
-  override def schema: StructType = {
-    // do the best to find the table schema.
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    try {
-      val avroSchema = schemaResolver.getTableAvroSchema
-      AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-    } catch {
-      case NonFatal(e) =>
-        StructType(Nil)

Review Comment:
   can you clarify why this removal is necessary to CDC read?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define five cdc infer cases. The different cdc infer case will decide which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:

Review Comment:
   please update this doc accordingly. you may also point to the read section in the RFC



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SafeAvroProjection.scala:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.SafeAvroProjection.collectFieldOrdinals
+import org.apache.hudi.common.util.ValidationUtils.checkState
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+
+import scala.collection.JavaConverters._
+
+// TODO extract to HoodieAvroSchemaUtils
+abstract class AvroProjection extends (GenericRecord => GenericRecord)
+
+class SafeAvroProjection(sourceSchema: Schema,

Review Comment:
   if you moved code around, please annotate it by commenting on PR yourself and explain it's moved from where and what was modified. otherwise it'll be hard for reviewers to make a call to approve or not



##########
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.avro

Review Comment:
   > It has to. The original logical (use `val converter: Any => Any = {`) has a bug that it will return the same value when we call this method twice continuously. And `HoodieCDCRDD` need these changes.
   
   so looks like an improvement we can land and fix separately? better to track it separately as CDC impl. do not need to know about this fix, right? the APIs remain the same. 
   
   Another note: it's not very obvious to reviewers until you explained as above. so for the sake of faster review, please comment on it yourself and explain proactively. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.DataSourceReadOptions
+import org.apache.hudi.HoodieDataSourceHelper
+import org.apache.hudi.HoodieTableSchema
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.internal.schema.InternalSchema
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+class CDCRelation(

Review Comment:
   scaladoc please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org