You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/01 01:38:24 UTC
[iceberg] branch master updated: Core: Implement HasTableOperations
in metadata and txn tables (#2398)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d9133f4 Core: Implement HasTableOperations in metadata and txn tables (#2398)
d9133f4 is described below
commit d9133f46df31c0d4b0cac33b23b9ba256264ed2d
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Mar 31 18:38:09 2021 -0700
Core: Implement HasTableOperations in metadata and txn tables (#2398)
---
.../java/org/apache/iceberg/AllDataFilesTable.java | 28 +++---------------
.../java/org/apache/iceberg/AllEntriesTable.java | 28 +++---------------
.../java/org/apache/iceberg/AllManifestsTable.java | 25 ++--------------
.../java/org/apache/iceberg/BaseMetadataTable.java | 34 +++++++++++++++++-----
.../java/org/apache/iceberg/BaseTransaction.java | 7 ++++-
.../java/org/apache/iceberg/DataFilesTable.java | 28 +++---------------
.../main/java/org/apache/iceberg/HistoryTable.java | 30 ++++---------------
.../org/apache/iceberg/ManifestEntriesTable.java | 28 +++---------------
.../java/org/apache/iceberg/ManifestsTable.java | 27 +++--------------
.../java/org/apache/iceberg/PartitionsTable.java | 34 +++++-----------------
.../java/org/apache/iceberg/SnapshotsTable.java | 28 +++---------------
11 files changed, 72 insertions(+), 225 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index 86b227a..13963b8 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -40,39 +40,24 @@ import org.apache.iceberg.util.ThreadPools;
* This table may return duplicate rows.
*/
public class AllDataFilesTable extends BaseMetadataTable {
- private final TableOperations ops;
- private final Table table;
- private final String name;
AllDataFilesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_data_files");
}
AllDataFilesTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new AllDataFilesTableScan(ops, table, schema());
+ return new AllDataFilesTableScan(operations(), table(), schema());
}
@Override
public Schema schema() {
- Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
- if (table.spec().fields().size() < 1) {
+ Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
+ if (table().spec().fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
@@ -81,11 +66,6 @@ public class AllDataFilesTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.ALL_DATA_FILES;
}
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index b8f5747..66f8b49 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -40,39 +40,24 @@ import org.apache.iceberg.util.ThreadPools;
* use {@link DataFilesTable}.
*/
public class AllEntriesTable extends BaseMetadataTable {
- private final TableOperations ops;
- private final Table table;
- private final String name;
AllEntriesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_entries");
}
AllEntriesTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new Scan(ops, table, schema());
+ return new Scan(operations(), table(), schema());
}
@Override
public Schema schema() {
- Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
- if (table.spec().fields().size() < 1) {
+ Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
+ if (table().spec().fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
@@ -81,11 +66,6 @@ public class AllEntriesTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.ALL_ENTRIES;
}
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 1a71fd9..dde7849 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -56,33 +56,17 @@ public class AllManifestsTable extends BaseMetadataTable {
)))
);
- private final TableOperations ops;
- private final Table table;
- private final String name;
-
AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
}
AllManifestsTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new AllManifestsTableScan(ops, table, MANIFEST_FILE_SCHEMA);
+ return new AllManifestsTableScan(operations(), table(), MANIFEST_FILE_SCHEMA);
}
@Override
@@ -91,11 +75,6 @@ public class AllManifestsTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.ALL_MANIFESTS;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index d67a214..1800636 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -34,11 +34,34 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
* using a {@link StaticTableOperations}. This way no Catalog related calls are needed when reading the table data after
* deserialization.
*/
-abstract class BaseMetadataTable implements Table, Serializable {
+abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
+ private final TableOperations ops;
+ private final Table table;
+ private final String name;
- abstract Table table();
+ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
+ this.ops = ops;
+ this.table = table;
+ this.name = name;
+ }
+
+ abstract MetadataTableType metadataTableType();
+
+ protected Table table() {
+ return table;
+ }
+
+ @Override
+ public TableOperations operations() {
+ return ops;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
@Override
public FileIO io() {
@@ -195,12 +218,9 @@ abstract class BaseMetadataTable implements Table, Serializable {
return name();
}
- abstract String metadataLocation();
-
- abstract MetadataTableType metadataTableType();
-
final Object writeReplace() {
- return new TableProxy(io(), table().name(), name(), metadataLocation(), metadataTableType(), locationProvider());
+ String metadataLocation = ops.current().metadataFileLocation();
+ return new TableProxy(io(), table().name(), name(), metadataLocation, metadataTableType(), locationProvider());
}
static class TableProxy implements Serializable {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index a004854..925f37f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -499,7 +499,12 @@ class BaseTransaction implements Transaction {
}
}
- public class TransactionTable implements Table {
+ public class TransactionTable implements Table, HasTableOperations {
+
+ @Override
+ public TableOperations operations() {
+ return transactionOps;
+ }
@Override
public String name() {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 8a13cbf..ded26c0 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -32,39 +32,24 @@ import org.apache.iceberg.types.TypeUtil;
* A {@link Table} implementation that exposes a table's data files as rows.
*/
public class DataFilesTable extends BaseMetadataTable {
- private final TableOperations ops;
- private final Table table;
- private final String name;
DataFilesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".files");
}
DataFilesTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new FilesTableScan(ops, table, schema());
+ return new FilesTableScan(operations(), table(), schema());
}
@Override
public Schema schema() {
- Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
- if (table.spec().fields().size() < 1) {
+ Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
+ if (table().spec().fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field
return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
} else {
@@ -73,11 +58,6 @@ public class DataFilesTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.FILES;
}
diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java
index e8cfa60..b731ebb 100644
--- a/core/src/main/java/org/apache/iceberg/HistoryTable.java
+++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java
@@ -41,33 +41,17 @@ public class HistoryTable extends BaseMetadataTable {
Types.NestedField.required(4, "is_current_ancestor", Types.BooleanType.get())
);
- private final TableOperations ops;
- private final Table table;
- private final String name;
-
HistoryTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".history");
}
HistoryTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new HistoryScan();
+ return new HistoryScan(operations(), table());
}
@Override
@@ -76,24 +60,20 @@ public class HistoryTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.HISTORY;
}
private DataTask task(TableScan scan) {
+ TableOperations ops = operations();
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshotLog(),
- convertHistoryEntryFunc(table));
+ convertHistoryEntryFunc(table()));
}
private class HistoryScan extends StaticTableScan {
- HistoryScan() {
+ HistoryScan(TableOperations ops, Table table) {
super(ops, table, HISTORY_SCHEMA, HistoryTable.this::task);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 9851d72..5e006e0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -37,39 +37,24 @@ import org.apache.iceberg.types.TypeUtil;
* use {@link DataFilesTable}.
*/
public class ManifestEntriesTable extends BaseMetadataTable {
- private final TableOperations ops;
- private final Table table;
- private final String name;
ManifestEntriesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".entries");
}
ManifestEntriesTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new EntriesTableScan(ops, table, schema());
+ return new EntriesTableScan(operations(), table(), schema());
}
@Override
public Schema schema() {
- Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
- if (table.spec().fields().size() < 1) {
+ Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
+ if (table().spec().fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
@@ -78,11 +63,6 @@ public class ManifestEntriesTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.ENTRIES;
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
index d31828e..e7b9222 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
@@ -44,35 +44,20 @@ public class ManifestsTable extends BaseMetadataTable {
)))
);
- private final TableOperations ops;
- private final Table table;
private final PartitionSpec spec;
- private final String name;
ManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".manifests");
}
ManifestsTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
+ super(ops, table, name);
this.spec = table.spec();
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
}
@Override
public TableScan newScan() {
- return new ManifestsTableScan();
+ return new ManifestsTableScan(operations(), table());
}
@Override
@@ -81,16 +66,12 @@ public class ManifestsTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.MANIFESTS;
}
protected DataTask task(TableScan scan) {
+ TableOperations ops = operations();
String location = scan.snapshot().manifestListLocation();
return StaticDataTask.of(
ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
@@ -99,7 +80,7 @@ public class ManifestsTable extends BaseMetadataTable {
}
private class ManifestsTableScan extends StaticTableScan {
- ManifestsTableScan() {
+ ManifestsTableScan(TableOperations ops, Table table) {
super(ops, table, SNAPSHOT_SCHEMA, ManifestsTable.this::task);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 5cd6290..1410924 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -29,62 +29,44 @@ import org.apache.iceberg.util.StructLikeWrapper;
*/
public class PartitionsTable extends BaseMetadataTable {
- private final TableOperations ops;
- private final Table table;
private final Schema schema;
- private final String name;
PartitionsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".partitions");
}
PartitionsTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
+ super(ops, table, name);
+
this.schema = new Schema(
Types.NestedField.required(1, "partition", table.spec().partitionType()),
Types.NestedField.required(2, "record_count", Types.LongType.get()),
Types.NestedField.required(3, "file_count", Types.IntegerType.get())
);
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
}
@Override
public TableScan newScan() {
- return new PartitionsScan();
+ return new PartitionsScan(operations(), table());
}
@Override
public Schema schema() {
- if (table.spec().fields().size() < 1) {
+ if (table().spec().fields().size() < 1) {
return schema.select("record_count", "file_count");
}
return schema;
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.PARTITIONS;
}
private DataTask task(TableScan scan) {
- Iterable<Partition> partitions = partitions(table, scan.snapshot().snapshotId());
- if (table.spec().fields().size() < 1) {
+ TableOperations ops = operations();
+ Iterable<Partition> partitions = partitions(table(), scan.snapshot().snapshotId());
+ if (table().spec().fields().size() < 1) {
// the table is unpartitioned, partitions contains only the root partition
return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
root -> StaticDataTask.Row.of(root.recordCount, root.fileCount));
@@ -114,7 +96,7 @@ public class PartitionsTable extends BaseMetadataTable {
}
private class PartitionsScan extends StaticTableScan {
- PartitionsScan() {
+ PartitionsScan(TableOperations ops, Table table) {
super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this::task);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
index 51d668c..338b7aa 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
@@ -38,33 +38,17 @@ public class SnapshotsTable extends BaseMetadataTable {
Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.StringType.get()))
);
- private final TableOperations ops;
- private final Table table;
- private final String name;
-
SnapshotsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".snapshots");
}
SnapshotsTable(TableOperations ops, Table table, String name) {
- this.ops = ops;
- this.table = table;
- this.name = name;
- }
-
- @Override
- Table table() {
- return table;
- }
-
- @Override
- public String name() {
- return name;
+ super(ops, table, name);
}
@Override
public TableScan newScan() {
- return new SnapshotsTableScan();
+ return new SnapshotsTableScan(operations(), table());
}
@Override
@@ -73,6 +57,7 @@ public class SnapshotsTable extends BaseMetadataTable {
}
private DataTask task(BaseTableScan scan) {
+ TableOperations ops = operations();
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshots(),
@@ -80,17 +65,12 @@ public class SnapshotsTable extends BaseMetadataTable {
}
@Override
- String metadataLocation() {
- return ops.current().metadataFileLocation();
- }
-
- @Override
MetadataTableType metadataTableType() {
return MetadataTableType.SNAPSHOTS;
}
private class SnapshotsTableScan extends StaticTableScan {
- SnapshotsTableScan() {
+ SnapshotsTableScan(TableOperations ops, Table table) {
super(ops, table, SNAPSHOT_SCHEMA, SnapshotsTable.this::task);
}