You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/03 16:32:40 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request #1287: Vectorized Reads of Parquet with Identity Partitions

RussellSpitzer opened a new pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287


   Previously vectorization would be disabled whenever an underlying iceberg table
   was using Parquet files and also used Identity transforms in it's partitioning.
   
   To fix this we extend the DummyVectorReader to be a ConstantVectorReader which is
   used when a column's value can be determined from the PartitionSpec. Then when
   constructing the reader we use a ConstantColumnVector to fill in the missing
   column.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464570559



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
##########
@@ -91,17 +91,44 @@ public int numValues() {
     return vector.getValueCount();
   }
 
+  public static VectorHolder constantHolder(int numRows, Object constantValue) {
+    return new DummyVectorHolder(numRows, constantValue);
+  }
+
   public static VectorHolder dummyHolder(int numRows) {
-    return new VectorHolder() {
-      @Override
-      public int numValues() {
-        return numRows;
-      }
-    };
+    return new DummyVectorHolder(numRows);
   }
 
   public boolean isDummy() {
     return vector == null;
   }
 
+  /**
+   * A Vector Holder which does not actually produce values, consumers of this class should
+   * use the constantValue to populate their ColumnVector implementation.
+   */
+  public static class DummyVectorHolder extends VectorHolder {

Review comment:
       Makes sense to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668911804


   Merged. Thanks, @RussellSpitzer! Good to have this feature done.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464570186



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       When I added in the Hive Import it gets the schema in a different order, I think this may be an issue with the import code? I'm not sure, but I know the default column order does not come out the same way :/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464590594



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       I'll try to figure out the actual issue today, but I agree it shouldn't work this way. My assumption is that the Hive table schema is just being listed in a different order or when we use SparkSchemaUtil the order is getting scrambled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464568883



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.

Review comment:
       Nit: Looks like this Javadoc is missing the normal `* ` at the start of these lines?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668154679


   @samarthjain, can you help review this one?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668270047


   Thanks @samarthjain and @rdblue I applied all your comments! The only thing which I couldn't address was the Hive (saveAsTable) reordering thing. But hopefully I can get some time to working on making a save.format(iceberg) to due some column pruning with identity transforms and simplify this test later?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464567794



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
##########
@@ -91,17 +91,44 @@ public int numValues() {
     return vector.getValueCount();
   }
 
+  public static VectorHolder constantHolder(int numRows, Object constantValue) {
+    return new DummyVectorHolder(numRows, constantValue);
+  }
+
   public static VectorHolder dummyHolder(int numRows) {
-    return new VectorHolder() {
-      @Override
-      public int numValues() {
-        return numRows;
-      }
-    };
+    return new DummyVectorHolder(numRows);
   }
 
   public boolean isDummy() {
     return vector == null;
   }
 
+  /**
+   * A Vector Holder which does not actually produce values, consumers of this class should
+   * use the constantValue to populate their ColumnVector implementation.
+   */
+  public static class DummyVectorHolder extends VectorHolder {

Review comment:
       Could we rename this to `ConstantVectorHolder` instead of "dummy"? Now that it returns a constant value, I think it's more accurate to use "constant".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668119361


   CC @aokolnychyi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464607549



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -356,5 +356,36 @@ public String toString() {
     public void setBatchSize(int batchSize) {}
   }
 
+  /**
+   * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
+   * VectorHolder which indicates the constant value which should be used for this column.
+   * @param <T> The constant value to use
+   */
+  public static class ConstantVectorReader<T> extends VectorizedArrowReader {
+    private final T value;
+
+    public ConstantVectorReader(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+      return VectorHolder.constantHolder(numValsToRead, value);

Review comment:
       Of course, I was thinking about that but I got a little worried about the "null" version since I felt a little weird about creating a parameterized type with null. I forgot how Java handles that but I'll double check and change the class if possible. IE what happens with
   
   foo\<t\> ( T bar) when you invoke foo(null)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464683194



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       I spent some time digging into this, 
   When you call saveAsTable it ends up in this bit of code in DataFrameWriter
   
   ```scala
       val tableDesc = CatalogTable(
         identifier = tableIdent,
         tableType = tableType,
         storage = storage,
         schema = new StructType,
         provider = Some(source),
         partitionColumnNames = partitioningColumns.getOrElse(Nil),
         bucketSpec = getBucketSpec)
   ```
   
   Which strips out whatever incoming schema you have. So the new table is created without any information about the actual ordering of columns you used in the create.
   
   Then when the Relation is resolved, that's when the attributes are looked up again and the schema is created from the Attribute output. So long story short, saveAsTable doesn't care about your field ordering as far as I can tell.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464683194



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       I spent some time digging into this, 
   When you call saveAsTable it ends up in this bit of code in DataFrameWriter
   
   ```scala
       val tableDesc = CatalogTable(
         identifier = tableIdent,
         tableType = tableType,
         storage = storage,
         schema = new StructType,
         provider = Some(source),
         partitionColumnNames = partitioningColumns.getOrElse(Nil),
         bucketSpec = getBucketSpec)
   ```
   
   Which strips out whatever incoming schema you have. So the new table is created without any information about the actual ordering of columns you used in the create.
   
   Then when the Relation is resolved, that's when the attributes are looked up again and the schema is created from the Attribute output. So long story short, saveAsTable doesn't care about your field ordering as far as I can tell. This is all in Spark and I'm not sure we can do anything about it here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] samarthjain commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
samarthjain commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668186516


   One minor comment, but generally looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668163296


   I wasn't really happy about doing the instance checking in Java (if dummy
   then cast), it makes me long for Scala :P
   I do think this is probably a minimal set of changes to get this in without
   breaking too much open
   
   On Mon, Aug 3, 2020 at 12:59 PM Ryan Blue <no...@github.com> wrote:
   
   > Looks mostly good to me!
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/iceberg/pull/1287#issuecomment-668161352>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADE2YN4YJZSKNOSMAPSBU3R633JDANCNFSM4PTQKJVQ>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464607549



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -356,5 +356,36 @@ public String toString() {
     public void setBatchSize(int batchSize) {}
   }
 
+  /**
+   * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
+   * VectorHolder which indicates the constant value which should be used for this column.
+   * @param <T> The constant value to use
+   */
+  public static class ConstantVectorReader<T> extends VectorizedArrowReader {
+    private final T value;
+
+    public ConstantVectorReader(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+      return VectorHolder.constantHolder(numValsToRead, value);

Review comment:
       Of course, I was thinking about that but I got a little worried about the "null" version since I felt a little weird about creating a parameterized type with null. I forgot how Java handles that but I'll double check and change the class if possible. IE what happens with
   
   foo<t> ( T bar) when you invoke foo(null)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#issuecomment-668161352


   Looks mostly good to me!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464643993



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -356,5 +356,36 @@ public String toString() {
     public void setBatchSize(int batchSize) {}
   }
 
+  /**
+   * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
+   * VectorHolder which indicates the constant value which should be used for this column.
+   * @param <T> The constant value to use
+   */
+  public static class ConstantVectorReader<T> extends VectorizedArrowReader {
+    private final T value;
+
+    public ConstantVectorReader(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+      return VectorHolder.constantHolder(numValsToRead, value);

Review comment:
       If it is always null, you can use the `Void` type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464683905



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.

Review comment:
       I was just leaving a multiline comment, but I'll make it a javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] samarthjain commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464599289



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -356,5 +356,36 @@ public String toString() {
     public void setBatchSize(int batchSize) {}
   }
 
+  /**
+   * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
+   * VectorHolder which indicates the constant value which should be used for this column.
+   * @param <T> The constant value to use
+   */
+  public static class ConstantVectorReader<T> extends VectorizedArrowReader {
+    private final T value;
+
+    public ConstantVectorReader(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+      return VectorHolder.constantHolder(numValsToRead, value);

Review comment:
       `VectorHolder.constantHolder` and `DummyVectorHolder` aren't parameterized. It would make sense to have them accept value of type `T` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464569489



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       Isn't this the default? Why was it necessary to add `select`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r465413132



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       I'm fine with this, then. Thanks for looking into it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1287: Vectorized Reads of Parquet with Identity Partitions

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464579759



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
   private Table table = null;
   private Dataset<Row> logs = null;
 
-  @Before
-  public void setupTable() throws Exception {
+  /*
+  Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
+  parquet files. This makes sure that if the identity mapping fails, the test will also fail.
+   */
+  private void setupParquet() throws Exception {
     File location = temp.newFolder("logs");
+    File hiveLocation = temp.newFolder("hive");
+    String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
     Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
-    this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
     this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+    logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
+        .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+    this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
+        SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());
+
+    SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
+  }
 
-    logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+  @Before
+  public void setupTable() throws Exception {
+    if (format.equals("parquet")) {
+      setupParquet();
+    } else {
+      File location = temp.newFolder("logs");
+      Assert.assertTrue("Temp folder should exist", location.exists());
+
+      Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+      this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
+      this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
+
+      logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
+    }
   }
 
   @Test
   public void testFullProjection() {
     List<Row> expected = logs.orderBy("id").collectAsList();
     List<Row> actual = spark.read().format("iceberg")
         .option("vectorization-enabled", String.valueOf(vectorized))
-        .load(table.location()).orderBy("id").collectAsList();
+        .load(table.location()).orderBy("id")
+        .select("id", "date", "level", "message")

Review comment:
       That's suspicious. We'll have to look into why the schema has the wrong order. I see `select` before all the writes, so it shouldn't need the reorder here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org