You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/03 02:48:48 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818270346



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       @Yaohua628
   
   Sorry for the post review. I haven't had a time to review this in time.
   
   Just to make clear, select("*").show() should not expose a hidden column, right? Since you've included "_metadata" from the list of columns so I would like to double confirm that it is not user facing.
   
   And given we include the new column, `dropDuplicate` without explicitly mentioning columns in streaming query would be broken. state schema would somehow include the hidden column in the schema, whereas state schema from older version of checkpoint does not include the hidden column (as they didn't exist).
   
   cc. @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org