You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/27 20:10:48 UTC

[GitHub] [hudi] zhedoubushishi commented on a change in pull request #2485: [HUDI-1109] Support Spark Structured Streaming read from Hudi table

zhedoubushishi commented on a change in pull request #2485:
URL: https://github.com/apache/hudi/pull/2485#discussion_r565508953



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming

Review comment:
       Do we need to add ```org.apache.spark``` as the prefix? Shall we just use ```org.apache.hudi.streaming```?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource  extends StreamTest {

Review comment:
       [nit] duplicated space

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

Review comment:
       This class is not compatible with Spark 3. I am thinking if we could move this class to ```hudi-spark2``` module (or any class that is not compatible with Spark 3). In the long term, we should make this change be compatible with both Spark 2 and Spark 3 but I think you can create a JIRA to track this for now. 
   
   You may run ```mvn clean install -Dspark3 -DskipTests``` to see if your change is work with Spark 3.
   
   

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  @transient private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialPartitionOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {

Review comment:
       For my understanding, is ```HDFSMetadataLog``` also compatible with other DFS like s3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org