You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/01 01:38:24 UTC

[iceberg] branch master updated: Core: Implement HasTableOperations in metadata and txn tables (#2398)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d9133f4  Core: Implement HasTableOperations in metadata and txn tables (#2398)
d9133f4 is described below

commit d9133f46df31c0d4b0cac33b23b9ba256264ed2d
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Mar 31 18:38:09 2021 -0700

    Core: Implement HasTableOperations in metadata and txn tables (#2398)
---
 .../java/org/apache/iceberg/AllDataFilesTable.java | 28 +++---------------
 .../java/org/apache/iceberg/AllEntriesTable.java   | 28 +++---------------
 .../java/org/apache/iceberg/AllManifestsTable.java | 25 ++--------------
 .../java/org/apache/iceberg/BaseMetadataTable.java | 34 +++++++++++++++++-----
 .../java/org/apache/iceberg/BaseTransaction.java   |  7 ++++-
 .../java/org/apache/iceberg/DataFilesTable.java    | 28 +++---------------
 .../main/java/org/apache/iceberg/HistoryTable.java | 30 ++++---------------
 .../org/apache/iceberg/ManifestEntriesTable.java   | 28 +++---------------
 .../java/org/apache/iceberg/ManifestsTable.java    | 27 +++--------------
 .../java/org/apache/iceberg/PartitionsTable.java   | 34 +++++-----------------
 .../java/org/apache/iceberg/SnapshotsTable.java    | 28 +++---------------
 11 files changed, 72 insertions(+), 225 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index 86b227a..13963b8 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -40,39 +40,24 @@ import org.apache.iceberg.util.ThreadPools;
  * This table may return duplicate rows.
  */
 public class AllDataFilesTable extends BaseMetadataTable {
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
 
   AllDataFilesTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".all_data_files");
   }
 
   AllDataFilesTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new AllDataFilesTableScan(ops, table, schema());
+    return new AllDataFilesTableScan(operations(), table(), schema());
   }
 
   @Override
   public Schema schema() {
-    Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
-    if (table.spec().fields().size() < 1) {
+    Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
+    if (table().spec().fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
@@ -81,11 +66,6 @@ public class AllDataFilesTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.ALL_DATA_FILES;
   }
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index b8f5747..66f8b49 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -40,39 +40,24 @@ import org.apache.iceberg.util.ThreadPools;
  * use {@link DataFilesTable}.
  */
 public class AllEntriesTable extends BaseMetadataTable {
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
 
   AllEntriesTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".all_entries");
   }
 
   AllEntriesTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new Scan(ops, table, schema());
+    return new Scan(operations(), table(), schema());
   }
 
   @Override
   public Schema schema() {
-    Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
-    if (table.spec().fields().size() < 1) {
+    Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
+    if (table().spec().fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
@@ -81,11 +66,6 @@ public class AllEntriesTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.ALL_ENTRIES;
   }
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 1a71fd9..dde7849 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -56,33 +56,17 @@ public class AllManifestsTable extends BaseMetadataTable {
       )))
   );
 
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
-
   AllManifestsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".all_manifests");
   }
 
   AllManifestsTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new AllManifestsTableScan(ops, table, MANIFEST_FILE_SCHEMA);
+    return new AllManifestsTableScan(operations(), table(), MANIFEST_FILE_SCHEMA);
   }
 
   @Override
@@ -91,11 +75,6 @@ public class AllManifestsTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.ALL_MANIFESTS;
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index d67a214..1800636 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -34,11 +34,34 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
  * using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
  * deserialization.
  */
-abstract class BaseMetadataTable implements Table, Serializable {
+abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
   private final PartitionSpec spec = PartitionSpec.unpartitioned();
   private final SortOrder sortOrder = SortOrder.unsorted();
+  private final TableOperations ops;
+  private final Table table;
+  private final String name;
 
-  abstract Table table();
+  protected BaseMetadataTable(TableOperations ops, Table table, String name) {
+    this.ops = ops;
+    this.table = table;
+    this.name = name;
+  }
+
+  abstract MetadataTableType metadataTableType();
+
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public TableOperations operations() {
+    return ops;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
 
   @Override
   public FileIO io() {
@@ -195,12 +218,9 @@ abstract class BaseMetadataTable implements Table, Serializable {
     return name();
   }
 
-  abstract String metadataLocation();
-
-  abstract MetadataTableType metadataTableType();
-
   final Object writeReplace() {
-    return new TableProxy(io(), table().name(), name(), metadataLocation(), metadataTableType(), locationProvider());
+    String metadataLocation = ops.current().metadataFileLocation();
+    return new TableProxy(io(), table().name(), name(), metadataLocation, metadataTableType(), locationProvider());
   }
 
   static class TableProxy implements Serializable {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index a004854..925f37f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -499,7 +499,12 @@ class BaseTransaction implements Transaction {
     }
   }
 
-  public class TransactionTable implements Table {
+  public class TransactionTable implements Table, HasTableOperations {
+
+    @Override
+    public TableOperations operations() {
+      return transactionOps;
+    }
 
     @Override
     public String name() {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 8a13cbf..ded26c0 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -32,39 +32,24 @@ import org.apache.iceberg.types.TypeUtil;
  * A {@link Table} implementation that exposes a table's data files as rows.
  */
 public class DataFilesTable extends BaseMetadataTable {
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
 
   DataFilesTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".files");
   }
 
   DataFilesTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new FilesTableScan(ops, table, schema());
+    return new FilesTableScan(operations(), table(), schema());
   }
 
   @Override
   public Schema schema() {
-    Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
-    if (table.spec().fields().size() < 1) {
+    Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
+    if (table().spec().fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field
       return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
     } else {
@@ -73,11 +58,6 @@ public class DataFilesTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.FILES;
   }
diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java
index e8cfa60..b731ebb 100644
--- a/core/src/main/java/org/apache/iceberg/HistoryTable.java
+++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java
@@ -41,33 +41,17 @@ public class HistoryTable extends BaseMetadataTable {
       Types.NestedField.required(4, "is_current_ancestor", Types.BooleanType.get())
   );
 
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
-
   HistoryTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".history");
   }
 
   HistoryTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new HistoryScan();
+    return new HistoryScan(operations(), table());
   }
 
   @Override
@@ -76,24 +60,20 @@ public class HistoryTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.HISTORY;
   }
 
   private DataTask task(TableScan scan) {
+    TableOperations ops = operations();
     return StaticDataTask.of(
         ops.io().newInputFile(ops.current().metadataFileLocation()),
         ops.current().snapshotLog(),
-        convertHistoryEntryFunc(table));
+        convertHistoryEntryFunc(table()));
   }
 
   private class HistoryScan extends StaticTableScan {
-    HistoryScan() {
+    HistoryScan(TableOperations ops, Table table) {
       super(ops, table, HISTORY_SCHEMA, HistoryTable.this::task);
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 9851d72..5e006e0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -37,39 +37,24 @@ import org.apache.iceberg.types.TypeUtil;
  * use {@link DataFilesTable}.
  */
 public class ManifestEntriesTable extends BaseMetadataTable {
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
 
   ManifestEntriesTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".entries");
   }
 
   ManifestEntriesTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new EntriesTableScan(ops, table, schema());
+    return new EntriesTableScan(operations(), table(), schema());
   }
 
   @Override
   public Schema schema() {
-    Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
-    if (table.spec().fields().size() < 1) {
+    Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
+    if (table().spec().fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
@@ -78,11 +63,6 @@ public class ManifestEntriesTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.ENTRIES;
   }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
index d31828e..e7b9222 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
@@ -44,35 +44,20 @@ public class ManifestsTable extends BaseMetadataTable {
       )))
   );
 
-  private final TableOperations ops;
-  private final Table table;
   private final PartitionSpec spec;
-  private final String name;
 
   ManifestsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".manifests");
   }
 
   ManifestsTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
+    super(ops, table, name);
     this.spec = table.spec();
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
   }
 
   @Override
   public TableScan newScan() {
-    return new ManifestsTableScan();
+    return new ManifestsTableScan(operations(), table());
   }
 
   @Override
@@ -81,16 +66,12 @@ public class ManifestsTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.MANIFESTS;
   }
 
   protected DataTask task(TableScan scan) {
+    TableOperations ops = operations();
     String location = scan.snapshot().manifestListLocation();
     return StaticDataTask.of(
         ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
@@ -99,7 +80,7 @@ public class ManifestsTable extends BaseMetadataTable {
   }
 
   private class ManifestsTableScan extends StaticTableScan {
-    ManifestsTableScan() {
+    ManifestsTableScan(TableOperations ops, Table table) {
       super(ops, table, SNAPSHOT_SCHEMA, ManifestsTable.this::task);
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 5cd6290..1410924 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -29,62 +29,44 @@ import org.apache.iceberg.util.StructLikeWrapper;
  */
 public class PartitionsTable extends BaseMetadataTable {
 
-  private final TableOperations ops;
-  private final Table table;
   private final Schema schema;
-  private final String name;
 
   PartitionsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".partitions");
   }
 
   PartitionsTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
+    super(ops, table, name);
+
     this.schema = new Schema(
         Types.NestedField.required(1, "partition", table.spec().partitionType()),
         Types.NestedField.required(2, "record_count", Types.LongType.get()),
         Types.NestedField.required(3, "file_count", Types.IntegerType.get())
     );
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
   }
 
   @Override
   public TableScan newScan() {
-    return new PartitionsScan();
+    return new PartitionsScan(operations(), table());
   }
 
   @Override
   public Schema schema() {
-    if (table.spec().fields().size() < 1) {
+    if (table().spec().fields().size() < 1) {
       return schema.select("record_count", "file_count");
     }
     return schema;
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.PARTITIONS;
   }
 
   private DataTask task(TableScan scan) {
-    Iterable<Partition> partitions = partitions(table, scan.snapshot().snapshotId());
-    if (table.spec().fields().size() < 1) {
+    TableOperations ops = operations();
+    Iterable<Partition> partitions = partitions(table(), scan.snapshot().snapshotId());
+    if (table().spec().fields().size() < 1) {
       // the table is unpartitioned, partitions contains only the root partition
       return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
           root -> StaticDataTask.Row.of(root.recordCount, root.fileCount));
@@ -114,7 +96,7 @@ public class PartitionsTable extends BaseMetadataTable {
   }
 
   private class PartitionsScan extends StaticTableScan {
-    PartitionsScan() {
+    PartitionsScan(TableOperations ops, Table table) {
       super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this::task);
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
index 51d668c..338b7aa 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
@@ -38,33 +38,17 @@ public class SnapshotsTable extends BaseMetadataTable {
           Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.StringType.get()))
   );
 
-  private final TableOperations ops;
-  private final Table table;
-  private final String name;
-
   SnapshotsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".snapshots");
   }
 
   SnapshotsTable(TableOperations ops, Table table, String name) {
-    this.ops = ops;
-    this.table = table;
-    this.name = name;
-  }
-
-  @Override
-  Table table() {
-    return table;
-  }
-
-  @Override
-  public String name() {
-    return name;
+    super(ops, table, name);
   }
 
   @Override
   public TableScan newScan() {
-    return new SnapshotsTableScan();
+    return new SnapshotsTableScan(operations(), table());
   }
 
   @Override
@@ -73,6 +57,7 @@ public class SnapshotsTable extends BaseMetadataTable {
   }
 
   private DataTask task(BaseTableScan scan) {
+    TableOperations ops = operations();
     return StaticDataTask.of(
         ops.io().newInputFile(ops.current().metadataFileLocation()),
         ops.current().snapshots(),
@@ -80,17 +65,12 @@ public class SnapshotsTable extends BaseMetadataTable {
   }
 
   @Override
-  String metadataLocation() {
-    return ops.current().metadataFileLocation();
-  }
-
-  @Override
   MetadataTableType metadataTableType() {
     return MetadataTableType.SNAPSHOTS;
   }
 
   private class SnapshotsTableScan extends StaticTableScan {
-    SnapshotsTableScan() {
+    SnapshotsTableScan(TableOperations ops, Table table) {
       super(ops, table, SNAPSHOT_SCHEMA, SnapshotsTable.this::task);
     }