You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/01/14 21:45:58 UTC

[incubator-iceberg] Diff for: [GitHub] rdblue merged pull request #68: Support customizing table locations

diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index 9acb50d4..1b61b847 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -97,6 +97,13 @@
    */
   UpdateProperties updateProperties();
 
+  /**
+   * Create a new {@link UpdateLocation} to update table location and commit the changes.
+   *
+   * @return a new {@link UpdateLocation}
+   */
+  UpdateLocation updateLocation();
+
   /**
    * Create a new {@link AppendFiles append API} to add files to this table and commit.
    *
diff --git a/api/src/main/java/com/netflix/iceberg/Transaction.java b/api/src/main/java/com/netflix/iceberg/Transaction.java
index 7d097238..bbbc03be 100644
--- a/api/src/main/java/com/netflix/iceberg/Transaction.java
+++ b/api/src/main/java/com/netflix/iceberg/Transaction.java
@@ -40,6 +40,13 @@
    */
   UpdateProperties updateProperties();
 
+  /**
+   * Create a new {@link UpdateLocation} to update table location.
+   *
+   * @return a new {@link UpdateLocation}
+   */
+  UpdateLocation updateLocation();
+
   /**
    * Create a new {@link AppendFiles append API} to add files to this table.
    *
diff --git a/api/src/main/java/com/netflix/iceberg/UpdateLocation.java b/api/src/main/java/com/netflix/iceberg/UpdateLocation.java
new file mode 100644
index 00000000..e5085313
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/UpdateLocation.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+/**
+ * API for setting a table's base location.
+ */
+public interface UpdateLocation extends PendingUpdate<String> {
+  /**
+   * Set the table's location.
+   *
+   * @param location a String location
+   * @return this for method chaining
+   */
+  UpdateLocation setLocation(String location);
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
index 3e9b420d..5228e80a 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
@@ -49,7 +49,6 @@
 
   private static final String METADATA_FOLDER_NAME = "metadata";
   private static final String DATA_FOLDER_NAME = "data";
-  private static final String HIVE_LOCATION_FOLDER_NAME = "empty";
 
   private final Configuration conf;
   private final FileIO fileIo;
@@ -57,7 +56,6 @@
   private TableMetadata currentMetadata = null;
   private String currentMetadataLocation = null;
   private boolean shouldRefresh = true;
-  private String baseLocation = null;
   private int version = -1;
 
   protected BaseMetastoreTableOperations(Configuration conf) {
@@ -85,16 +83,8 @@ protected void requestRefresh() {
     this.shouldRefresh = true;
   }
 
-  public String hiveTableLocation() {
-    return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
-  }
-
   protected String writeNewMetadata(TableMetadata metadata, int version) {
-    if (baseLocation == null) {
-      baseLocation = metadata.location();
-    }
-
-    String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
+    String newTableMetadataFilePath = newTableMetadataFilePath(metadata, version);
     OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
 
     // write the new metadata
@@ -115,19 +105,29 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
       Tasks.foreach(newLocation)
           .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */ )
           .suppressFailureWhenFinished()
-          .run(location -> {
-            this.currentMetadata = read(this, fromLocation(location, conf));
-            this.currentMetadataLocation = location;
-            this.baseLocation = currentMetadata.location();
-            this.version = parseVersion(location);
+          .run(metadataLocation -> {
+            this.currentMetadata = read(this, fromLocation(metadataLocation, conf));
+            this.currentMetadataLocation = metadataLocation;
+            this.version = parseVersion(metadataLocation);
           });
     }
     this.shouldRefresh = false;
   }
 
+  private String metadataFileLocation(TableMetadata metadata, String filename) {
+    String metadataLocation = metadata.properties()
+        .get(TableProperties.WRITE_METADATA_LOCATION);
+
+    if (metadataLocation != null) {
+      return String.format("%s/%s", metadataLocation, filename);
+    } else {
+      return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
+    }
+  }
+
   @Override
-  public String metadataFileLocation(String fileName) {
-    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
+  public String metadataFileLocation(String filename) {
+    return metadataFileLocation(current(), filename);
   }
 
   @Override
@@ -135,13 +135,9 @@ public FileIO io() {
     return fileIo;
   }
 
-  private String newTableMetadataFilePath(String baseLocation, int newVersion) {
-    return String.format("%s/%s/%05d-%s%s",
-            baseLocation,
-            METADATA_FOLDER_NAME,
-            newVersion,
-            UUID.randomUUID(),
-            getFileExtension(this.conf));
+  private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
+    return metadataFileLocation(meta,
+        String.format("%05d-%s%s", newVersion, UUID.randomUUID(), getFileExtension(this.conf)));
   }
 
   private static int parseVersion(String metadataLocation) {
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java
index 7d48ef2b..ad0d3245 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTable.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java
@@ -91,6 +91,11 @@ public UpdateProperties updateProperties() {
     return new PropertiesUpdate(ops);
   }
 
+  @Override
+  public UpdateLocation updateLocation() {
+    return new SetLocation(ops);
+  }
+
   @Override
   public AppendFiles newAppend() {
     return new MergeAppend(ops);
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
index b7c3a32d..49724945 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
@@ -107,6 +107,14 @@ public UpdateProperties updateProperties() {
     return props;
   }
 
+  @Override
+  public UpdateLocation updateLocation() {
+    checkLastOperationCommitted("UpdateLocation");
+    UpdateLocation setLocation = new SetLocation(transactionOps);
+    updates.add(setLocation);
+    return setLocation;
+  }
+
   @Override
   public AppendFiles newAppend() {
     checkLastOperationCommitted("AppendFiles");
@@ -327,6 +335,11 @@ public UpdateProperties updateProperties() {
       return BaseTransaction.this.updateProperties();
     }
 
+    @Override
+    public UpdateLocation updateLocation() {
+      return BaseTransaction.this.updateLocation();
+    }
+
     @Override
     public AppendFiles newAppend() {
       return BaseTransaction.this.newAppend();
diff --git a/core/src/main/java/com/netflix/iceberg/SetLocation.java b/core/src/main/java/com/netflix/iceberg/SetLocation.java
new file mode 100644
index 00000000..be649715
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/SetLocation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.util.Tasks;
+
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+public class SetLocation implements UpdateLocation {
+  private final TableOperations ops;
+  private String newLocation;
+
+  public SetLocation(TableOperations ops) {
+    this.ops = ops;
+    this.newLocation = null;
+  }
+
+  @Override
+  public UpdateLocation setLocation(String location) {
+    this.newLocation = location;
+    return this;
+  }
+
+  @Override
+  public String apply() {
+    return newLocation;
+  }
+
+  @Override
+  public void commit() {
+    TableMetadata base = ops.refresh();
+    Tasks.foreach(ops)
+        .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */ )
+        .onlyRetryOn(CommitFailedException.class)
+        .run(ops -> ops.commit(base, base.updateLocation(newLocation)));
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index c949f13a..645dff29 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -428,6 +428,12 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec
         -1, snapshots, ImmutableList.of());
   }
 
+  public TableMetadata updateLocation(String newLocation) {
+    return new TableMetadata(ops, null, newLocation,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
+  }
+
   private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
     PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
         .withSpecId(specId);
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 69bfcf29..925900a1 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -67,11 +67,16 @@
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
 
-  // This only applies to files written after this property is set. Files previously written aren't relocated to
-  // reflect this parameter.
+  // This only applies to files written after this property is set. Files previously written aren't
+  // relocated to reflect this parameter.
   // If not set, defaults to a "data" folder underneath the root path of the table.
   public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
 
+  // This only applies to files written after this property is set. Files previously written aren't
+  // relocated to reflect this parameter.
+  // If not set, defaults to a "meatdata" folder underneath the root path of the table.
+  public static final String WRITE_METADATA_LOCATION = "write.metadata.path";
+
   public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
   public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
 }
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
index 1a3b0fd4..e8c98dd7 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
@@ -19,10 +19,12 @@
 
 package com.netflix.iceberg.hadoop;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
+import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
@@ -107,6 +109,12 @@ public void commit(TableMetadata base, TableMetadata metadata) {
       return;
     }
 
+    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
+        "Hadoop path-based tables cannot be relocated");
+    Preconditions.checkArgument(
+        !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
+        "Hadoop path-based tables cannot relocate metadata");
+
     Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
     TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
diff --git a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
index 37093707..e0e2e192 100644
--- a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
@@ -136,7 +136,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
                 (int) currentTimeMillis / 1000,
                 (int) currentTimeMillis / 1000,
                 Integer.MAX_VALUE,
-                storageDescriptor(metadata.schema()),
+                storageDescriptor(metadata),
                 Collections.emptyList(),
                 new HashMap<>(),
                 null,
@@ -144,7 +144,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
                 ICEBERG_TABLE_TYPE_VALUE);
       }
 
-      tbl.setSd(storageDescriptor(metadata.schema())); // set to pickup any schema changes
+      tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
       final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
       if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
         throw new CommitFailedException(format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
@@ -189,11 +189,11 @@ private void setParameters(String newMetadataLocation, Table tbl) {
     tbl.setParameters(parameters);
   }
 
-  private StorageDescriptor storageDescriptor(Schema schema) {
+  private StorageDescriptor storageDescriptor(TableMetadata metadata) {
 
     final StorageDescriptor storageDescriptor = new StorageDescriptor();
-    storageDescriptor.setCols(columns(schema));
-    storageDescriptor.setLocation(hiveTableLocation());
+    storageDescriptor.setCols(columns(metadata.schema()));
+    storageDescriptor.setLocation(metadata.location());
     storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
     storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
     SerDeInfo serDeInfo = new SerDeInfo();
diff --git a/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
index 8c2bfdcb..f3955e25 100644
--- a/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java
@@ -206,7 +206,7 @@ String getTableBasePath(String tableName) {
   }
 
   String getTableLocation(String tableName) {
-    return new Path("file", null, Paths.get(getTableBasePath(tableName), "empty").toString()).toString();
+    return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
   }
 
   String metadataLocation(String tableName) {


With regards,
Apache Git Services