You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/20 15:09:19 UTC

[iceberg] branch master updated: Avro: Fix file import with correct row count (#3273)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 303f925  Avro: Fix file import with correct row count (#3273)
303f925 is described below

commit 303f92547c344146a3ea500e315c285fa446d7b2
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Wed Oct 20 08:09:10 2021 -0700

    Avro: Fix file import with correct row count (#3273)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |  8 +++
 .../apache/iceberg/data/TableMigrationUtil.java    |  6 ++-
 spark/v3.0/build.gradle                            |  2 +
 .../spark/extensions/TestAddFilesProcedure.java    | 63 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index c6f9f0a..b467bea 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -644,4 +644,12 @@ public class Avro {
     }
   }
 
+  /**
+   * Returns number of rows in specified Avro file
+   * @param file Avro file
+   * @return number of rows in file
+   */
+  public static long rowCount(InputFile file) {
+    return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE);
+  }
 }
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index 2750847..a432c76 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -36,7 +36,9 @@ import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.orc.OrcMetrics;
 import org.apache.iceberg.parquet.ParquetUtil;
@@ -91,7 +93,9 @@ public class TableMigrationUtil {
       return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
           .filter(FileStatus::isFile)
           .map(stat -> {
-            Metrics metrics = new Metrics(-1L, null, null, null);
+            InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
+            long rowCount = Avro.rowCount(file);
+            Metrics metrics = new Metrics(rowCount, null, null, null);
             String partitionKey = spec.fields().stream()
                 .map(PartitionField::name)
                 .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle
index ef9a99b..ec0b7c4 100644
--- a/spark/v3.0/build.gradle
+++ b/spark/v3.0/build.gradle
@@ -121,6 +121,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
     testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
     testImplementation project(path: ':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts')
 
+    testImplementation "org.apache.avro:avro"
+
     // Required because we remove antlr plugin dependencies from the compile configuration, see note above
     // We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
     runtimeOnly "org.antlr:antlr4-runtime:4.7.1"
diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index 50ab8b9..7f5c7df 100644
--- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -24,8 +24,17 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -35,6 +44,7 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -106,6 +116,59 @@ public class TestAddFilesProcedure extends SparkExtensionsTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
 
+  @Test
+  public void addAvroFile() throws Exception {
+    // Spark Session Catalog cannot load metadata tables
+    // with "The namespace in session catalog must have exactly one name part"
+    Assume.assumeFalse(catalogName.equals("spark_catalog"));
+
+    // Create an Avro file
+
+    Schema schema = SchemaBuilder.record("record").fields()
+        .requiredInt("id")
+        .requiredString("data")
+        .endRecord();
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", 1L);
+    record1.put("data", "a");
+    GenericRecord record2 = new GenericData.Record(schema);
+    record2.put("id", 2L);
+    record2.put("data", "b");
+    File outputFile = temp.newFile("test.avro");
+
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter(schema);
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter(datumWriter);
+    dataFileWriter.create(schema, outputFile);
+    dataFileWriter.append(record1);
+    dataFileWriter.append(record2);
+    dataFileWriter.close();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Long, data String) USING iceberg";
+    sql(createIceberg, tableName);
+
+    Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')",
+        catalogName, tableName, outputFile.getPath());
+    Assert.assertEquals(1L, result);
+
+    List<Object[]> expected = Lists.newArrayList(
+        new Object[]{1L, "a"},
+        new Object[]{2L, "b"}
+    );
+
+    assertEquals("Iceberg table contains correct data",
+        expected,
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    List<Object[]> actualRecordCount = sql("select %s from %s.files",
+        DataFile.RECORD_COUNT.name(),
+        tableName);
+    List<Object[]> expectedRecordCount = Lists.newArrayList();
+    expectedRecordCount.add(new Object[]{2L});
+    assertEquals("Iceberg file metadata should have correct metadata count",
+        expectedRecordCount, actualRecordCount);
+  }
+
   // TODO Adding spark-avro doesn't work in tests
   @Ignore
   public void addDataUnpartitionedAvro() {