You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/01/14 21:45:58 UTC
[incubator-iceberg] Diff for: [GitHub] rdblue merged pull request #68:
Support customizing table locations
diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index 9acb50d4..1b61b847 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -97,6 +97,13 @@
*/
UpdateProperties updateProperties();
+ /**
+ * Create a new {@link UpdateLocation} to update table location and commit the changes.
+ *
+ * @return a new {@link UpdateLocation}
+ */
+ UpdateLocation updateLocation();
+
/**
* Create a new {@link AppendFiles append API} to add files to this table and commit.
*
diff --git a/api/src/main/java/com/netflix/iceberg/Transaction.java b/api/src/main/java/com/netflix/iceberg/Transaction.java
index 7d097238..bbbc03be 100644
--- a/api/src/main/java/com/netflix/iceberg/Transaction.java
+++ b/api/src/main/java/com/netflix/iceberg/Transaction.java
@@ -40,6 +40,13 @@
*/
UpdateProperties updateProperties();
+ /**
+ * Create a new {@link UpdateLocation} to update table location.
+ *
+ * @return a new {@link UpdateLocation}
+ */
+ UpdateLocation updateLocation();
+
/**
* Create a new {@link AppendFiles append API} to add files to this table.
*
diff --git a/api/src/main/java/com/netflix/iceberg/UpdateLocation.java b/api/src/main/java/com/netflix/iceberg/UpdateLocation.java
new file mode 100644
index 00000000..e5085313
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/UpdateLocation.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+/**
+ * API for setting a table's base location.
+ */
+public interface UpdateLocation extends PendingUpdate<String> {
+ /**
+ * Set the table's location.
+ *
+ * @param location a String location
+ * @return this for method chaining
+ */
+ UpdateLocation setLocation(String location);
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
index 3e9b420d..5228e80a 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
@@ -49,7 +49,6 @@
private static final String METADATA_FOLDER_NAME = "metadata";
private static final String DATA_FOLDER_NAME = "data";
- private static final String HIVE_LOCATION_FOLDER_NAME = "empty";
private final Configuration conf;
private final FileIO fileIo;
@@ -57,7 +56,6 @@
private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
private boolean shouldRefresh = true;
- private String baseLocation = null;
private int version = -1;
protected BaseMetastoreTableOperations(Configuration conf) {
@@ -85,16 +83,8 @@ protected void requestRefresh() {
this.shouldRefresh = true;
}
- public String hiveTableLocation() {
- return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
- }
-
protected String writeNewMetadata(TableMetadata metadata, int version) {
- if (baseLocation == null) {
- baseLocation = metadata.location();
- }
-
- String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
+ String newTableMetadataFilePath = newTableMetadataFilePath(metadata, version);
OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
// write the new metadata
@@ -115,19 +105,29 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */ )
.suppressFailureWhenFinished()
- .run(location -> {
- this.currentMetadata = read(this, fromLocation(location, conf));
- this.currentMetadataLocation = location;
- this.baseLocation = currentMetadata.location();
- this.version = parseVersion(location);
+ .run(metadataLocation -> {
+ this.currentMetadata = read(this, fromLocation(metadataLocation, conf));
+ this.currentMetadataLocation = metadataLocation;
+ this.version = parseVersion(metadataLocation);
});
}
this.shouldRefresh = false;
}
+ private String metadataFileLocation(TableMetadata metadata, String filename) {
+ String metadataLocation = metadata.properties()
+ .get(TableProperties.WRITE_METADATA_LOCATION);
+
+ if (metadataLocation != null) {
+ return String.format("%s/%s", metadataLocation, filename);
+ } else {
+ return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
+ }
+ }
+
@Override
- public String metadataFileLocation(String fileName) {
- return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
+ public String metadataFileLocation(String filename) {
+ return metadataFileLocation(current(), filename);
}
@Override
@@ -135,13 +135,9 @@ public FileIO io() {
return fileIo;
}
- private String newTableMetadataFilePath(String baseLocation, int newVersion) {
- return String.format("%s/%s/%05d-%s%s",
- baseLocation,
- METADATA_FOLDER_NAME,
- newVersion,
- UUID.randomUUID(),
- getFileExtension(this.conf));
+ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
+ return metadataFileLocation(meta,
+ String.format("%05d-%s%s", newVersion, UUID.randomUUID(), getFileExtension(this.conf)));
}
private static int parseVersion(String metadataLocation) {
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java
index 7d48ef2b..ad0d3245 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTable.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java
@@ -91,6 +91,11 @@ public UpdateProperties updateProperties() {
return new PropertiesUpdate(ops);
}
+ @Override
+ public UpdateLocation updateLocation() {
+ return new SetLocation(ops);
+ }
+
@Override
public AppendFiles newAppend() {
return new MergeAppend(ops);
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
index b7c3a32d..49724945 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
@@ -107,6 +107,14 @@ public UpdateProperties updateProperties() {
return props;
}
+ @Override
+ public UpdateLocation updateLocation() {
+ checkLastOperationCommitted("UpdateLocation");
+ UpdateLocation setLocation = new SetLocation(transactionOps);
+ updates.add(setLocation);
+ return setLocation;
+ }
+
@Override
public AppendFiles newAppend() {
checkLastOperationCommitted("AppendFiles");
@@ -327,6 +335,11 @@ public UpdateProperties updateProperties() {
return BaseTransaction.this.updateProperties();
}
+ @Override
+ public UpdateLocation updateLocation() {
+ return BaseTransaction.this.updateLocation();
+ }
+
@Override
public AppendFiles newAppend() {
return BaseTransaction.this.newAppend();
diff --git a/core/src/main/java/com/netflix/iceberg/SetLocation.java b/core/src/main/java/com/netflix/iceberg/SetLocation.java
new file mode 100644
index 00000000..be649715
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/SetLocation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.util.Tasks;
+
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+public class SetLocation implements UpdateLocation {
+ private final TableOperations ops;
+ private String newLocation;
+
+ public SetLocation(TableOperations ops) {
+ this.ops = ops;
+ this.newLocation = null;
+ }
+
+ @Override
+ public UpdateLocation setLocation(String location) {
+ this.newLocation = location;
+ return this;
+ }
+
+ @Override
+ public String apply() {
+ return newLocation;
+ }
+
+ @Override
+ public void commit() {
+ TableMetadata base = ops.refresh();
+ Tasks.foreach(ops)
+ .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */ )
+ .onlyRetryOn(CommitFailedException.class)
+ .run(ops -> ops.commit(base, base.updateLocation(newLocation)));
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index c949f13a..645dff29 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -428,6 +428,12 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec
-1, snapshots, ImmutableList.of());
}
+ public TableMetadata updateLocation(String newLocation) {
+ return new TableMetadata(ops, null, newLocation,
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, snapshots, snapshotLog);
+ }
+
private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
.withSpecId(specId);
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 69bfcf29..925900a1 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -67,11 +67,16 @@
public static final String OBJECT_STORE_PATH = "write.object-storage.path";
- // This only applies to files written after this property is set. Files previously written aren't relocated to
- // reflect this parameter.
+ // This only applies to files written after this property is set. Files previously written aren't
+ // relocated to reflect this parameter.
// If not set, defaults to a "data" folder underneath the root path of the table.
public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
+ // This only applies to files written after this property is set. Files previously written aren't
+ // relocated to reflect this parameter.
+ // If not set, defaults to a "meatdata" folder underneath the root path of the table.
+ public static final String WRITE_METADATA_LOCATION = "write.metadata.path";
+
public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
}
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
index 1a3b0fd4..e8c98dd7 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
@@ -19,10 +19,12 @@
package com.netflix.iceberg.hadoop;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.TableMetadata;
import com.netflix.iceberg.TableMetadataParser;
import com.netflix.iceberg.TableOperations;
+import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.exceptions.ValidationException;
@@ -107,6 +109,12 @@ public void commit(TableMetadata base, TableMetadata metadata) {
return;
}
+ Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
+ "Hadoop path-based tables cannot be relocated");
+ Preconditions.checkArgument(
+ !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
+ "Hadoop path-based tables cannot relocate metadata");
+
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
diff --git a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
index 37093707..e0e2e192 100644
--- a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
@@ -136,7 +136,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
- storageDescriptor(metadata.schema()),
+ storageDescriptor(metadata),
Collections.emptyList(),
new HashMap<>(),
null,
@@ -144,7 +144,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
ICEBERG_TABLE_TYPE_VALUE);
}
- tbl.setSd(storageDescriptor(metadata.schema())); // set to pickup any schema changes
+ tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
throw new CommitFailedException(format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
@@ -189,11 +189,11 @@ private void setParameters(String newMetadataLocation, Table tbl) {
tbl.setParameters(parameters);
}
- private StorageDescriptor storageDescriptor(Schema schema) {
+ private StorageDescriptor storageDescriptor(TableMetadata metadata) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
- storageDescriptor.setCols(columns(schema));
- storageDescriptor.setLocation(hiveTableLocation());
+ storageDescriptor.setCols(columns(metadata.schema()));
+ storageDescriptor.setLocation(metadata.location());
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
diff --git a/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
index 8c2bfdcb..f3955e25 100644
--- a/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
@@ -206,7 +206,7 @@ String getTableBasePath(String tableName) {
}
String getTableLocation(String tableName) {
- return new Path("file", null, Paths.get(getTableBasePath(tableName), "empty").toString()).toString();
+ return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
}
String metadataLocation(String tableName) {
With regards,
Apache Git Services