You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/28 07:50:23 UTC

[GitHub] [spark] Yaohua628 opened a new pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Yaohua628 opened a new pull request #35676:
URL: https://github.com/apache/spark/pull/35676


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Before this PR, querying the hidden file metadata struct `_metadata` will fail using `readStream`, `writeStream` streaming APIs. 
   ```
   spark
     .readStream
     ...
     .select("_metadata")
     .writeStream
     ...
     .start()
   ```
   This PR brings the file source hidden file metadata to the structured streaming.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Add a new UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1054976945


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Yaohua628 commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
Yaohua628 commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1053981590


   @cloud-fan appreciate it if you can take a look, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818270346



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       @Yaohua628
   
   Sorry for the post review. I haven't had a time to review this in time.
   
   Just to make clear, select("*").show() should not expose a hidden column, right? Since you've included "_metadata" from the list of columns so I would like to double confirm that it is not user facing.
   
   And given we include the new column, `dropDuplicate` without explicitly mentioning columns in streaming query would be broken. state schema would somehow include the hidden column in the schema, whereas state schema from older version of checkpoint does not include the hidden column (as they didn't exist).
   
   cc. @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1054177608


   Can you fill the `What changes were proposed in this pull request?` section and fix merge conflicts? The code change looks good. Also cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818274250



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       Ah OK my bad. That is just checking the output.
   
   `dropDuplicate()` still remains a question. How we deal with this? We removed the column by default for Kafka header to not break compatibility with `dropDuplicate()`, but not sure we would like to add more config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Yaohua628 commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
Yaohua628 commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1057840425


   After manual testing locally, changes of `_metadata` in streaming won't cause any backward-compatible issues for `dropDuplicates()`:
   1. with **Spark 3.2**: `readStream...select("*").dropDuplicates().writeStream...option("checkpointLocation", ".../checkpoint")`
   2. verify results (duplicated data did get dropped), verify checkpoint dir (commits, offsets, sources, state)
   3. add some new files including duplicates
   4. with **Spark master (3.3 with _metadata feature)** with the **_same_** checkpoint location: `readStream...select("*").dropDuplicates().writeStream...option("checkpointLocation", ".../checkpoint")`
   5. verify results (duplicated data did get dropped), verify checkpoint dir (commits, offsets, sources, state)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Yaohua628 commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
Yaohua628 commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818277389



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       I see! I can test with `dropDuplicates()` and see how it goes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818270346



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       @Yaohua628
   
   Sorry for the post review. I haven't had a time to review this in time.
   
   Just to make clear, select("*").show() should not expose a hidden column, right? Since you've included "_metadata" from the list of columns so I would like to double confirm that it is not user facing.
   
   And given we include the new column, `dropDuplicate` without explicitly mentioning columns in streaming query would be broken. state schema would somehow include the hidden column in the schema, whereas state schema from older version of checkpoint does not include the hidden column (as they didn't exist).
   
   We should test it, and if it fall into the case, we should mention this in the migration guide, or make this configurable and by default turn off. (We did this for adding Kafka header - #22282)
   
   cc. @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1057860523


   Thanks for checking it manually! We are good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35676:
URL: https://github.com/apache/spark/pull/35676#issuecomment-1055427341


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #35676:
URL: https://github.com/apache/spark/pull/35676


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Yaohua628 commented on a change in pull request #35676: [SPARK-38323][SQL][Streaming] Support the hidden file metadata in Streaming

Posted by GitBox <gi...@apache.org>.
Yaohua628 commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818272375



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of /source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))

Review comment:
       Yep, `select("*")` won't expose the hidden file metadata column.
   
   But here what I did is `readStream` and explicitly selecting `*` and `_metadata` ([here](https://github.com/apache/spark/pull/35676/files/ae9959c4ea6a5212d8023f9561f4551470815c43#diff-5abdd1a23ad2a7e61719d62d51208148a0bc556ae1c05923226ccfe4ac67cf15R521)) and `writeStream` to a target table `/target/new-streaming-data`, so the target table itself will have a column called `_metadata`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org