You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/01/06 22:24:31 UTC

[GitHub] [incubator-iceberg] prodeezy commented on a change in pull request #695: [WIP] Cherrypick snapshot feature

prodeezy commented on a change in pull request #695: [WIP] Cherrypick snapshot feature
URL: https://github.com/apache/incubator-iceberg/pull/695#discussion_r363512273
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/CherryPickFromSnapshot.java
 ##########
 @@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+/**
+ * In an audit workflow, new data is written to an orphan snapshot that is not committed as the table's
+ * current state until it is audited. After auditing a change, it may need to be applied or cherry-picked
+ * on top of the latest snapshot instead of the one that was current when the audited changes were created.
+ *
+ * This class adds support for cherry-picking the changes from an orphan snapshot by applying them to
+ * the current snapshot. The output of the operation is a new snapshot with the changes from cherry-picked
+ * snapshot.
+ *
+ * Cherry-picking should apply the exact set of changes that were done in the original commit.
+ *  - All added files should be added to the new version.
+ *  - Todo: If files were deleted, then those files must still exist in the data set.
+ *  - Does not support Overwrite operations currently. Overwrites are considered as conflicts.
+ *
+ */
+class CherryPickFromSnapshot extends MergingSnapshotProducer<AppendFiles> implements CherryPick {
+  private final TableOperations ops;
+  private TableMetadata base = null;
+  private Long cherryPickSnapshotId = null;
+  private final AtomicInteger manifestCount = new AtomicInteger(0);
+
+  CherryPickFromSnapshot(TableOperations ops) {
+    super(ops);
+    this.ops = ops;
+    this.base = ops.refresh();
+  }
+
+  @Override
+  protected AppendFiles self() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * We only cherry pick for appends right now
+   * @return
+   */
+  @Override
+  protected String operation() {
+    return DataOperations.APPEND;
+  }
+
+  @Override
+  public CherryPickFromSnapshot fromSnapshotId(long snapshotId) {
+    Preconditions.checkArgument(base.snapshot(snapshotId) != null,
+        "Cannot cherry pick unknown snapshot id: %s", snapshotId);
+
+    this.cherryPickSnapshotId = snapshotId;
+    return this;
+  }
+
+  /**
+   * Apply the pending changes and return the uncommitted changes for validation.
+   * <p>
+   * This does not result in a permanent update.
+   *
+   * @return the uncommitted changes that would be committed by calling {@link #commit()}
+   * @throws ValidationException      If the pending changes cannot be applied to the current metadata
+   * @throws IllegalArgumentException If the pending changes are conflicting or invalid
+   */
+  @Override
+  public Snapshot apply() {
+    ValidationException.check(cherryPickSnapshotId != null,
+        "Cannot cherry pick unknown version: call fromSnapshotId");
+
+    Snapshot cherryPickSnapshot = base.snapshot(cherryPickSnapshotId);
+    String wapId = stagedWapId(cherryPickSnapshot);
+    ValidationException.check(wapId != null,
+        "Snapshot provided for cherrypick didn't have wap.id set on it. " +
+            "Only snapshots that were staged can be cherrypicked from.");
+    ValidationException.check(!base.isWapIdPublished(Long.parseLong(wapId)),
+        "Duplicate request to cherry pick wap id that was published already: %s", wapId);
 
 Review comment:
   > I think it's a good idea to have the check, but shouldn't we use Iceberg's snapshot ID? Then we can check whether that snapshot is in the current snapshot's ancestors, or whether any ancestor was picked from that snapshot.
   
   I think we are talking about two different validations. viz.
   1) check if a snapshot is already in the existing chain of snapshots so that same snapshot isn't cherrypicked again later down the chain (this, I think is what you are referring to), 
   2) check if the incoming snapshot has a `wap.id` that was already committed up the chain. Here the `wap.id` is a duplicate id across two or more unique snapshots that those writers are trying to cherry-pick. (this is what I'm trying to check for)
   
   
   I agree that the rollback usecase is a problem when checking using wap.id. In general I think we can make `wap.id` optional on the incoming snapshot. In validations, we can perform both checks 1 & 2. I can make wap.id optional and perform check 2 only if there is a `wap.id` on the snapshot.
   
   My point is, check 2 is essential to our usecase as it allows writers to write concurrently and protects against duplicate wap.ids from entering the system. Do you think this is something we can can incorporate into this workflow? Or do you suggest we do this another way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org