You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/08 19:11:34 UTC

[GitHub] [iceberg] gustavoatt opened a new pull request #1184: Gustavoatt parquet read int96 timestamps

gustavoatt opened a new pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184


   ## Summary
   
   Add read support for Parquet INT96 timestamps (fixes https://github.com/apache/iceberg/issues/1138). This is needed so that parquet files written by Spark, that used INT96 timestamps, are able to be read by Iceberg without having to rewrite these files. This is specially useful for migrations.
   
   https://github.com/apache/parquet-format/pull/49 has more information about how parquet int96 timestamps are stored. Note that I only implemented read support since this representation has many issues (as visible in the conversation in the `parquet-format` PR).
   
   ## Testing
   
   - [x] Added unit test for spark readers
   - [ ] Unsure about what is the best place to add unit-tests for the non-spark parquet readers. Would gladly add one.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459136129



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();

Review comment:
       At one point, we supported writing to Parquet using Spark's built-in ReadSupport. I think we can probably get that working again to create the files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459728793



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();

Review comment:
       Yes, looking at one of the tests we do support writing parquet files using Spark's WriteSupport.
   
    To be able to use a `FileAppender` I had to add a TimestampAsInt96 type (that can only be written using Spark's builtin WriteSupport) so that schema conversion within Iceberg's `ParquetWriteSupport` knows that this timestamps should be encoded as int96 in the  parquet  schema.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-655706117


   cc: @rdblue @edgarRd 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-663738403


   Thanks for merging and for the review @rdblue!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r460131668



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");

Review comment:
       I initially tried that way but the writer fails because the file already exists.

##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -219,16 +220,30 @@ public static TimestampType withoutZone() {
       return INSTANCE_WITHOUT_ZONE;
     }
 
+    /**
+     * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons
+     * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided.
+     */

Review comment:
       Agreed. I found a way to have tests running that doesn't add a new type, I had to create an implementation of `ParquetWriter.Builder` that uses Spark's `ParquetWriteSupport` and Iceberg's `ParquetWriteAdapter` to avoid creating a `SparkSession`.

##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
+    final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));

Review comment:
       Done. Removed these final modifiers.

##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
+    final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
+
+    try (FileAppender<InternalRow> writer =
+        Parquet.write(Files.localOutput(parquetFile.toString()))
+            .writeSupport(
+                new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport())
+            .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+            .set("org.apache.spark.legacyDateTime", "false")
+            .set("spark.sql.parquet.int96AsTimestamp", "true")
+            .set("spark.sql.parquet.writeLegacyFormat", "false")
+            .set("spark.sql.parquet.outputTimestampType", "INT96")
+            .schema(schema)

Review comment:
       I'm not sure I fully understand this comment.
   
   But I did change my approach here, and while still writing `InternalRow` I removed most of these properties and left only the relevant ones to make sure that Spark writes these as int96.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] thesquelched commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
thesquelched commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-663731426


   Awesome possum, thanks for resolving this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459826436



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
+    final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));

Review comment:
       Nit: we don't use `final` for local variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459110702



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();

Review comment:
       Another approach would be to check-in a parquet file written by a spark and have the test just read it?
   
   A drawback with that approach is that updating this file would be brittle, but I can check in the code that writes the file in an ignored test, but that should avoid us from creating a spark session during unit tests. What do you think @rdblue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r460261628



##########
File path: data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
##########
@@ -345,6 +352,25 @@ public LocalDateTime read(LocalDateTime reuse) {
     }
   }
 
+  private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
+    private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
+
+    private TimestampInt96Reader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public LocalDateTime read(LocalDateTime reuse) {
+      final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);

Review comment:
       Note for reviewers (and future me): `toByteBuffer` returns a duplicate of the internal buffer so that it is safe for uses of it to modify the buffer's position with methods like `getLong`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459826175



##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -219,16 +220,30 @@ public static TimestampType withoutZone() {
       return INSTANCE_WITHOUT_ZONE;
     }
 
+    /**
+     * @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons
+     * and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided.
+     */

Review comment:
       I don't think we should change the type system to support this. INT96 may be something that we can read, but Iceberg cannot write it, per the spec.
   
   Was this needed to build the tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] gustavoatt commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
gustavoatt commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r454597256



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();

Review comment:
       Yes, I would much rather avoid creating a `SparkSession` here if possible. However, looking into [`ParquetFileFormat`](https://sourcegraph.com/github.com/apache/spark@d6a68e0b67ff7de58073c176dd097070e88ac831/-/blob/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L74:1) it seems like we would still need to pass a `SparkSession` to create the writer.
   
   I can look at [`ParquetOutputWriter`](https://sourcegraph.com/github.com/apache/spark@d6a68e0b67ff7de58073c176dd097070e88ac831/-/blob/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala#L32) but I might need to match the configuration there with what Spark uses to write int96.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-657906631


   Mostly looks good, but I'd like to fix up the test to avoid creating a SparkSession for just one case. Thanks @gustavoatt!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459826759



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
+    final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
+
+    try (FileAppender<InternalRow> writer =
+        Parquet.write(Files.localOutput(parquetFile.toString()))
+            .writeSupport(
+                new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport())
+            .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+            .set("org.apache.spark.legacyDateTime", "false")
+            .set("spark.sql.parquet.int96AsTimestamp", "true")
+            .set("spark.sql.parquet.writeLegacyFormat", "false")
+            .set("spark.sql.parquet.outputTimestampType", "INT96")
+            .schema(schema)

Review comment:
       I'd prefer to pass in a normal timestamp type and set a property, if needed, to enable INT96 support.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r459826650



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
+    final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");

Review comment:
       Why not use `temp.newFile`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r454035805



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();

Review comment:
       Is it possible to avoid creating a Spark session just to write a timestamp? What about calling Spark's `FileFormat` to write directly instead?
   
   We wrap Spark's `FileFormat` in our DSv2 table implementation: https://github.com/Netflix/iceberg/blob/netflix-spark-2.4/metacat/src/main/java/com/netflix/iceberg/batch/BatchPatternWrite.java#L90
   
   This test would run much faster by using that to create a file instead of creating a Spark context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-663729533


   Merged. Thanks for fixing this, @gustavoatt!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r454036152



##########
File path: spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException {
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
+
+  protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(inputFile)
+            .project(schema)
+            .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  @Test
+  public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
+    final SparkSession spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.parquet.int96AsTimestamp", "false")
+            .getOrCreate();
+
+    final String parquetPath = temp.getRoot().getAbsolutePath() + "/parquet_int96";
+    final java.sql.Timestamp ts = java.sql.Timestamp.valueOf("2014-01-01 23:00:01");
+    spark.createDataset(ImmutableList.of(ts), Encoders.TIMESTAMP()).write().parquet(parquetPath);

Review comment:
       Using Spark's `FileFormat` would also make this test easier. You'd be able to pass in a value in micros and validate that you get the same value back, unmodified. You'd also not need to locate the Parquet file using `find`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1184: Read support for parquet int96 timestamps

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#issuecomment-663710298


   Nice work, @gustavoatt! Thank you for updating this so that the test is self-contained.
   
   I'll merge this when tests are passing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org