You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/20 15:09:19 UTC
[iceberg] branch master updated: Avro: Fix file import with correct
row count (#3273)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 303f925 Avro: Fix file import with correct row count (#3273)
303f925 is described below
commit 303f92547c344146a3ea500e315c285fa446d7b2
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Wed Oct 20 08:09:10 2021 -0700
Avro: Fix file import with correct row count (#3273)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 8 +++
.../apache/iceberg/data/TableMigrationUtil.java | 6 ++-
spark/v3.0/build.gradle | 2 +
.../spark/extensions/TestAddFilesProcedure.java | 63 ++++++++++++++++++++++
4 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index c6f9f0a..b467bea 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -644,4 +644,12 @@ public class Avro {
}
}
+ /**
+ * Returns number of rows in specified Avro file
+ * @param file Avro file
+ * @return number of rows in file
+ */
+ public static long rowCount(InputFile file) {
+ return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE);
+ }
}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index 2750847..a432c76 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -36,7 +36,9 @@ import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
@@ -91,7 +93,9 @@ public class TableMigrationUtil {
return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
- Metrics metrics = new Metrics(-1L, null, null, null);
+ InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
+ long rowCount = Avro.rowCount(file);
+ Metrics metrics = new Metrics(rowCount, null, null, null);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle
index ef9a99b..ec0b7c4 100644
--- a/spark/v3.0/build.gradle
+++ b/spark/v3.0/build.gradle
@@ -121,6 +121,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts')
+ testImplementation "org.apache.avro:avro"
+
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
runtimeOnly "org.antlr:antlr4-runtime:4.7.1"
diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index 50ab8b9..7f5c7df 100644
--- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -24,8 +24,17 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -35,6 +44,7 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@@ -106,6 +116,59 @@ public class TestAddFilesProcedure extends SparkExtensionsTestBase {
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+ @Test
+ public void addAvroFile() throws Exception {
+ // Spark Session Catalog cannot load metadata tables
+ // with "The namespace in session catalog must have exactly one name part"
+ Assume.assumeFalse(catalogName.equals("spark_catalog"));
+
+ // Create an Avro file
+
+ Schema schema = SchemaBuilder.record("record").fields()
+ .requiredInt("id")
+ .requiredString("data")
+ .endRecord();
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", 1L);
+ record1.put("data", "a");
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", 2L);
+ record2.put("data", "b");
+ File outputFile = temp.newFile("test.avro");
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter(schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter(datumWriter);
+ dataFileWriter.create(schema, outputFile);
+ dataFileWriter.append(record1);
+ dataFileWriter.append(record2);
+ dataFileWriter.close();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Long, data String) USING iceberg";
+ sql(createIceberg, tableName);
+
+ Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')",
+ catalogName, tableName, outputFile.getPath());
+ Assert.assertEquals(1L, result);
+
+ List<Object[]> expected = Lists.newArrayList(
+ new Object[]{1L, "a"},
+ new Object[]{2L, "b"}
+ );
+
+ assertEquals("Iceberg table contains correct data",
+ expected,
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ List<Object[]> actualRecordCount = sql("select %s from %s.files",
+ DataFile.RECORD_COUNT.name(),
+ tableName);
+ List<Object[]> expectedRecordCount = Lists.newArrayList();
+ expectedRecordCount.add(new Object[]{2L});
+ assertEquals("Iceberg file metadata should have correct metadata count",
+ expectedRecordCount, actualRecordCount);
+ }
+
// TODO Adding spark-avro doesn't work in tests
@Ignore
public void addDataUnpartitionedAvro() {