You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/03 02:53:29 UTC

[GitHub] [spark] Yaohua628 commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Yaohua628 commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818272375



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       Yep, `select("*")` won't expose the hidden file metadata column.
   
   But here what I did is `readStream` and explicitly selecting `*` and `_metadata` ([here](https://github.com/apache/spark/pull/35676/files/ae9959c4ea6a5212d8023f9561f4551470815c43#diff-5abdd1a23ad2a7e61719d62d51208148a0bc556ae1c05923226ccfe4ac67cf15R521)) and `writeStream` to a target table `/target/new-streaming-data`, so the target table itself will have a column called `_metadata`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org