You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/06/01 19:35:46 UTC

[iceberg] branch 0.13.x updated (7d15ab952 -> fae977a9f)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a change to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


 discard 7d15ab952 Dev: Fix Source Release Script (#4932)
    omit 82be15951 Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
    omit 487b42858 Core: Fix query failure when using projection on top of partitions metadata table (#4720) (#4890)
    omit f7fd01364 Add version.txt for release 0.13.2
     new 73f91a360 Core: Fix query failure when using projection on top of partitions metadata table (#4720) (#4890)
     new 80c290da1 Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
     new fae977a9f Dev: Fix Source Release Script (#4932)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7d15ab952)
            \
             N -- N -- N   refs/heads/0.13.x (fae977a9f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 version.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[iceberg] 03/03: Dev: Fix Source Release Script (#4932)

Posted by ru...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit fae977a9f0a79266a04647b0df2ab540cf0dcff4
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Wed Jun 1 13:48:42 2022 -0500

    Dev: Fix Source Release Script (#4932)
---
 dev/source-release.sh | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/dev/source-release.sh b/dev/source-release.sh
index 67c2621cf..5f145f357 100755
--- a/dev/source-release.sh
+++ b/dev/source-release.sh
@@ -36,7 +36,7 @@ usage () {
 }
 
 # Default repository remote name
-remote="origin"
+remote="apache"
 
 while getopts "v:r:k:g:d" opt; do
   case "${opt}" in
@@ -112,9 +112,9 @@ tarball=$tag.tar.gz
 git archive $release_hash --worktree-attributes --prefix $tag/ -o $projectdir/$tarball
 
 echo "Signing the tarball..."
-[[ -z "$keyid" ]] && keyopt="-u $keyid"
-gpg --detach-sig $keyopt --armor --output ${projectdir}/${tarball}.asc ${projectdir}/$tarball
-shasum -a 512 ${projectdir}/$tarball > ${projectdir}/${tarball}.sha512
+[[ -n "$keyid" ]] && keyopt="-u $keyid"
+gpg $keyopt --armor --output ${projectdir}/${tarball}.asc --detach-sig ${projectdir}/$tarball
+shasum -a 512 $tarball > ${projectdir}/${tarball}.sha512
 
 
 echo "Checking out Iceberg RC subversion repo..."


[iceberg] 01/03: Core: Fix query failure when using projection on top of partitions metadata table (#4720) (#4890)

Posted by ru...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 73f91a36079e7ee413a1202c0baf810103c086ed
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Sun May 29 13:39:19 2022 -0700

    Core: Fix query failure when using projection on top of partitions metadata table (#4720) (#4890)
---
 .../java/org/apache/iceberg/BaseTableScan.java     |   4 +
 .../java/org/apache/iceberg/PartitionsTable.java   |   2 +-
 .../org/apache/iceberg/TestMetadataTableScans.java |  20 +++
 .../spark/extensions/TestMetadataTables.java       | 151 +++++++++++++++++++++
 4 files changed, 176 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 651bfa386..5e906d541 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -62,6 +62,10 @@ abstract class BaseTableScan implements TableScan {
     this.context = context;
   }
 
+  protected Schema tableSchema() {
+    return schema;
+  }
+
   protected TableOperations tableOps() {
     return ops;
   }
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index dbf1d39d1..8a8740e7c 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -111,7 +111,7 @@ public class PartitionsTable extends BaseMetadataTable {
 
     // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
     Expression partitionFilter = Projections
-        .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
+        .inclusive(transformSpec(scan.tableSchema(), table.spec()), caseSensitive)
         .project(scan.filter());
 
     ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 3bc4c4386..3e2ee1f1b 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -360,6 +360,26 @@ public class TestMetadataTableScans extends TableTestBase {
     validateIncludesPartitionScan(tasksNoFilter, 3);
   }
 
+  @Test
+  public void testPartitionsTableScanWithProjection() {
+    preparePartitionedTable();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(3, "file_count", Types.IntegerType.get())
+    ).asStruct();
+
+    TableScan scanWithProjection = partitionsTable.newScan().select("file_count");
+    Assert.assertEquals(expected, scanWithProjection.schema().asStruct());
+    CloseableIterable<FileScanTask> tasksWithProjection =
+        PartitionsTable.planFiles((StaticTableScan) scanWithProjection);
+    Assert.assertEquals(4, Iterators.size(tasksWithProjection.iterator()));
+    validateIncludesPartitionScan(tasksWithProjection, 0);
+    validateIncludesPartitionScan(tasksWithProjection, 1);
+    validateIncludesPartitionScan(tasksWithProjection, 2);
+    validateIncludesPartitionScan(tasksWithProjection, 3);
+  }
+
   @Test
   public void testPartitionsTableScanNoStats() {
     table.newFastAppend()
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
new file mode 100644
index 000000000..a8ca17ca7
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestMetadataTables extends SparkExtensionsTestBase {
+
+  public TestMetadataTables(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    sql("CREATE TABLE %s (id bigint, data string) " +
+        "USING iceberg " +
+        "PARTITIONED BY (data) " +
+        "TBLPROPERTIES" +
+        "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+    List<SimpleRecord> recordsA = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "a")
+    );
+    spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    List<SimpleRecord> recordsB = Lists.newArrayList(
+        new SimpleRecord(1, "b"),
+        new SimpleRecord(2, "b")
+    );
+    spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema();
+
+    List<ManifestFile> expectedDataManifests = table.currentSnapshot().dataManifests();
+    Assert.assertEquals("Should have 2 data manifests", 2, expectedDataManifests.size());
+
+    Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".files").schema();
+
+    // Check data files table
+    List<Record> expectedDataFiles = expectedEntries(table, FileContent.DATA,
+        entriesTableSchema, expectedDataManifests, "a");
+    Assert.assertEquals("Should have one data file manifest entry", 1, expectedDataFiles.size());
+
+    List<Row> actualDataFiles = spark.sql("SELECT * FROM " + tableName + ".files " +
+        "WHERE partition.data='a'").collectAsList();
+    Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size());
+    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDataFiles.get(0), actualDataFiles.get(0));
+
+    // Check partitions table
+    List<Row> actualPartitionsWithProjection =
+        spark.sql("SELECT file_count FROM " + tableName + ".partitions ").collectAsList();
+    Assert.assertEquals("Metadata table should return two partitions record", 2,
+        actualPartitionsWithProjection.size());
+    for (int i = 0; i < 2; ++i) {
+      Assert.assertEquals(1, actualPartitionsWithProjection.get(i).get(0));
+    }
+  }
+
+
+  /**
+   * Find matching manifest entries of an Iceberg table
+   * @param table iceberg table
+   * @param expectedContent file content to populate on entries
+   * @param entriesTableSchema schema of Manifest entries
+   * @param manifestsToExplore manifests to explore of the table
+   * @param partValue partition value that manifest entries must match, or null to skip filtering
+   */
+  private List<Record> expectedEntries(Table table, FileContent expectedContent, Schema entriesTableSchema,
+                                       List<ManifestFile> manifestsToExplore, String partValue) throws IOException {
+    List<Record> expected = Lists.newArrayList();
+    for (ManifestFile manifest : manifestsToExplore) {
+      InputFile in = table.io().newInputFile(manifest.path());
+      try (CloseableIterable<Record> rows = Avro.read(in).project(entriesTableSchema).build()) {
+        for (Record record : rows) {
+          if ((Integer) record.get("status") < 2 /* added or existing */) {
+            Record file = (Record) record.get("data_file");
+            if (partitionMatch(file, partValue)) {
+              asMetadataRecord(file, expectedContent);
+              expected.add(file);
+            }
+          }
+        }
+      }
+    }
+    return expected;
+  }
+
+  // Populate certain fields derived in the metadata tables
+  private void asMetadataRecord(Record file, FileContent content) {
+    file.put(0, content.id());
+    file.put(3, 0); // specId
+  }
+
+  private boolean partitionMatch(Record file, String partValue) {
+    if (partValue == null) {
+      return true;
+    }
+    Record partition = (Record) file.get(4);
+    return partValue.equals(partition.get(0).toString());
+  }
+}


[iceberg] 02/03: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)

Posted by ru...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 80c290da1af12ac87f2cfd1f643a4d3c8431ee07
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Wed Jun 1 20:07:52 2022 +0530

    Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
    
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../spark/extensions/TestMergeOnReadDelete.java    | 66 ++++++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      | 20 +++++--
 .../iceberg/spark/source/TestSparkCatalog.java     | 17 ++++++
 3 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 2316adf66..ae9fac864 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -20,8 +20,23 @@
 package org.apache.iceberg.spark.extensions;
 
 import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class TestMergeOnReadDelete extends TestDelete {
 
@@ -37,4 +52,55 @@ public class TestMergeOnReadDelete extends TestDelete {
         TableProperties.DELETE_MODE, "merge-on-read"
     );
   }
+
+
+  @Parameterized.AfterParam
+  public static void clearTestSparkCatalogCache() {
+    TestSparkCatalog.clearTables();
+  }
+
+  @Test
+  public void testCommitUnknownException() {
+    createAndInitTable("id INT, dep STRING, category STRING");
+
+    // write unpartitioned files
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}");
+    append(tableName, "{ \"id\": 2, \"dep\": \"hr\", \"category\": \"c1\" }\n" +
+        "{ \"id\": 3, \"dep\": \"hr\", \"category\": \"c1\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    RowDelta newRowDelta = table.newRowDelta();
+    RowDelta spyNewRowDelta = spy(newRowDelta);
+    doAnswer(invocation -> {
+      newRowDelta.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyNewRowDelta).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newRowDelta()).thenReturn(spyNewRowDelta);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    ImmutableMap<String, String> config = ImmutableMap.of(
+        "type", "hive",
+        "default-namespace", "default"
+    );
+    spark.conf().set("spark.sql.catalog.dummy_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+    config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.dummy_catalog." + key, value));
+    Identifier ident = Identifier.of(new String[]{"default"}, "table");
+    TestSparkCatalog.setTable(ident, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> sql("DELETE FROM %s WHERE id = 2", "dummy_catalog.default.table"));
+
+    // Since write and commit succeeded, the rows should be readable
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "hr", "c1"), row(3, "hr", "c1")),
+        sql("SELECT * FROM %s ORDER BY id", "dummy_catalog.default.table"));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 8e26ad80b..558b678bf 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
@@ -92,6 +93,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
+  private boolean cleanupOnAbort = true;
+
   SparkPositionDeltaWrite(SparkSession spark, Table table, Command command, SparkBatchQueryScan scan,
                           IsolationLevel isolationLevel, SparkWriteConf writeConf,
                           ExtendedLogicalWriteInfo info, Schema dataSchema,
@@ -215,6 +218,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
 
     @Override
     public void abort(WriterCommitMessage[] messages) {
+      if (!cleanupOnAbort) {
+        return;
+      }
+
       for (WriterCommitMessage message : messages) {
         if (message != null) {
           DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
@@ -239,10 +246,15 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
         operation.stageOnly();
       }
 
-      long start = System.currentTimeMillis();
-      operation.commit(); // abort is automatically called if this fails
-      long duration = System.currentTimeMillis() - start;
-      LOG.info("Committed in {} ms", duration);
+      try {
+        long start = System.currentTimeMillis();
+        operation.commit(); // abort is automatically called if this fails
+        long duration = System.currentTimeMillis() - start;
+        LOG.info("Committed in {} ms", duration);
+      } catch (CommitStateUnknownException commitStateUnknownException) {
+        cleanupOnAbort = false;
+        throw commitStateUnknownException;
+      }
     }
   }
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 0e5280875..6b4362b98 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,8 +19,11 @@
 
 package org.apache.iceberg.spark.source;
 
+import java.util.Map;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -31,8 +34,18 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 
 public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> extends SparkSessionCatalog<T> {
 
+  private static final Map<Identifier, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(Identifier ident, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(ident), "Cannot set " + ident + ". It is already set");
+    tableMap.put(ident, table);
+  }
+
   @Override
   public Table loadTable(Identifier ident) throws NoSuchTableException {
+    if (tableMap.containsKey(ident)) {
+      return tableMap.get(ident);
+    }
     TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
     Namespace namespace = tableIdentifier.namespace();
 
@@ -43,4 +56,8 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
 
     return new SparkTable(table, false);
   }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
 }