You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/18 18:56:47 UTC

[iceberg] branch master updated: Core: Parameterize RewriteDataFile's CommitService (#7343)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7301f38eaa Core: Parameterize RewriteDataFile's CommitService (#7343)
7301f38eaa is described below

commit 7301f38eaaca8095a208bc1df7b66f6f3280ea7a
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue Apr 18 11:56:41 2023 -0700

    Core: Parameterize RewriteDataFile's CommitService (#7343)
---
 .palantir/revapi.yml                               |   5 +
 .../apache/iceberg/actions/BaseCommitService.java  | 221 +++++++++++++++++++++
 .../actions/RewriteDataFilesCommitManager.java     | 162 +--------------
 3 files changed, 233 insertions(+), 155 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index b7afa49736..6e1d6aa826 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -426,6 +426,11 @@ acceptedBreaks:
     - code: "java.field.removedWithConstant"
       old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER"
       justification: "Removing deprecations for 1.3.0"
+    - code: "java.method.parameterTypeChanged"
+      old: "parameter void org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService::offer(===org.apache.iceberg.actions.RewriteFileGroup===)"
+      new: "parameter void org.apache.iceberg.actions.BaseCommitService<T>::offer(===T===)\
+        \ @ org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService"
+      justification: "Backwards compatible parameterization of argument"
     - code: "java.method.removed"
       old: "method ThisT org.apache.iceberg.BaseScan<ThisT, T extends org.apache.iceberg.ScanTask,\
         \ G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::newRefinedScan(org.apache.iceberg.TableOperations,\
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
new file mode 100644
index 0000000000..1f36f133d0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An async service which allows for committing multiple file groups as their rewrites complete. The
+ * service also allows for partial-progress since commits can fail. Once the service has been closed
+ * no new file groups should not be offered.
+ *
+ * <p>Specific implementations provide implementations for {@link #commitOrClean(Set)} and {@link
+ * #abortFileGroup(Object)}
+ *
+ * @param <T> abstract type of file group
+ */
+abstract class BaseCommitService<T> implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseCommitService.class);
+
+  private final Table table;
+  private final ExecutorService committerService;
+  private final ConcurrentLinkedQueue<T> completedRewrites;
+  private final ConcurrentLinkedQueue<String> inProgressCommits;
+  private final List<T> committedRewrites;
+  private final int rewritesPerCommit;
+  private final AtomicBoolean running = new AtomicBoolean(false);
+
+  /**
+   * Constructs a {@link BaseCommitService}
+   *
+   * @param table table to perform commit on
+   * @param rewritesPerCommit number of file groups to include in a commit
+   */
+  BaseCommitService(Table table, int rewritesPerCommit) {
+    this.table = table;
+    LOG.info(
+        "Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit);
+    this.rewritesPerCommit = rewritesPerCommit;
+
+    committerService =
+        Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
+
+    completedRewrites = Queues.newConcurrentLinkedQueue();
+    committedRewrites = Lists.newArrayList();
+    inProgressCommits = Queues.newConcurrentLinkedQueue();
+  }
+
+  /**
+   * Perform a commit operation on the table for the set of file groups, should cleanup failed file
+   * groups.
+   *
+   * @param batch set of file groups
+   */
+  protected abstract void commitOrClean(Set<T> batch);
+
+  /**
+   * Clean up a specified file set by removing any files created for that operation, should not
+   * throw any exceptions
+   *
+   * @param group group of files which are not yet committed
+   */
+  protected abstract void abortFileGroup(T group);
+
+  /** Starts a single threaded executor service for handling file group commits. */
+  public void start() {
+    Preconditions.checkState(running.compareAndSet(false, true), "Commit service already started");
+    LOG.info("Starting commit service for {}", table);
+    committerService.execute(
+        () -> {
+          while (running.get() || completedRewrites.size() > 0 || inProgressCommits.size() > 0) {
+            try {
+              if (completedRewrites.size() == 0 && inProgressCommits.size() == 0) {
+                // give other threads a chance to make progress
+                Thread.sleep(100);
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException("Interrupted while processing commits", e);
+            }
+
+            // commit whatever is left once done with writing.
+            if (!running.get() && completedRewrites.size() > 0) {
+              commitReadyCommitGroups();
+            }
+          }
+        });
+  }
+
+  /**
+   * Places a file group in the queue and commits a batch of file groups if {@link
+   * #rewritesPerCommit} number of file groups are present in the queue.
+   *
+   * @param group file group to eventually be committed
+   */
+  public void offer(T group) {
+    LOG.debug("Offered to commit service: {}", group);
+    Preconditions.checkState(
+        running.get(), "Cannot add rewrites to a service which has already been closed");
+    completedRewrites.add(group);
+    commitReadyCommitGroups();
+  }
+
+  /** Returns all File groups which have been committed */
+  public List<T> results() {
+    Preconditions.checkState(
+        committerService.isShutdown(),
+        "Cannot get results from a service which has not been closed");
+    return committedRewrites;
+  }
+
+  @Override
+  public void close() {
+    Preconditions.checkState(
+        running.compareAndSet(true, false), "Cannot close already closed commit service");
+    LOG.info("Closing commit service for {} waiting for all commits to finish", table);
+    committerService.shutdown();
+
+    boolean timeout = false;
+    try {
+      // All rewrites have completed and all new files have been created, we are now waiting for
+      // the commit pool to finish doing its commits to Iceberg State. In the case of partial
+      // progress this should have been occurring simultaneously with rewrites, if not there should
+      // be only a single commit operation.
+      if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
+        LOG.warn(
+            "Commit operation did not complete within 120 minutes of the all files "
+                + "being rewritten. This may mean that some changes were not successfully committed to the "
+                + "table.");
+        timeout = true;
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(
+          "Cannot complete commit for rewrite, commit service interrupted", e);
+    }
+
+    if (!completedRewrites.isEmpty() && timeout) {
+      LOG.error("Attempting to cleanup uncommitted file groups");
+      completedRewrites.forEach(this::abortFileGroup);
+    }
+
+    Preconditions.checkArgument(
+        !timeout && completedRewrites.isEmpty(),
+        "Timeout occurred when waiting for commits to complete. "
+            + "{} file groups committed. {} file groups remain uncommitted. "
+            + "Retry this operation to attempt rewriting the failed groups.",
+        committedRewrites.size(),
+        completedRewrites.size());
+
+    Preconditions.checkState(
+        completedRewrites.isEmpty(),
+        "File groups offered after service was closed, " + "they were not successfully committed.");
+  }
+
+  private void commitReadyCommitGroups() {
+    Set<T> batch = null;
+    if (canCreateCommitGroup()) {
+      synchronized (completedRewrites) {
+        if (canCreateCommitGroup()) {
+          batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
+          for (int i = 0; i < rewritesPerCommit && !completedRewrites.isEmpty(); i++) {
+            batch.add(completedRewrites.poll());
+          }
+        }
+      }
+    }
+
+    if (batch != null) {
+      String inProgressCommitToken = UUID.randomUUID().toString();
+      inProgressCommits.add(inProgressCommitToken);
+      try {
+        commitOrClean(batch);
+        committedRewrites.addAll(batch);
+      } catch (Exception e) {
+        LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e);
+      }
+      inProgressCommits.remove(inProgressCommitToken);
+    }
+  }
+
+  private boolean canCreateCommitGroup() {
+    // Either we have a full commit group, or we have completed writing and need to commit
+    // what is left over
+    boolean fullCommitGroup = completedRewrites.size() >= rewritesPerCommit;
+    boolean writingComplete = !running.get() && completedRewrites.size() > 0;
+    return fullCommitGroup || writingComplete;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 11e1ecc5e2..265b5c5c27 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -18,24 +18,13 @@
  */
 package org.apache.iceberg.actions;
 
-import java.io.Closeable;
-import java.util.List;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,157 +123,20 @@ public class RewriteDataFilesCommitManager {
     return new CommitService(rewritesPerCommit);
   }
 
-  public class CommitService implements Closeable {
-    private final ExecutorService committerService;
-    private final ConcurrentLinkedQueue<RewriteFileGroup> completedRewrites;
-    private final ConcurrentLinkedQueue<String> inProgressCommits;
-    private final List<RewriteFileGroup> committedRewrites;
-    private final int rewritesPerCommit;
-    private final AtomicBoolean running = new AtomicBoolean(false);
+  public class CommitService extends BaseCommitService<RewriteFileGroup> {
 
     CommitService(int rewritesPerCommit) {
-      LOG.info(
-          "Creating commit service for table {} with {} groups per commit",
-          table,
-          rewritesPerCommit);
-      this.rewritesPerCommit = rewritesPerCommit;
-
-      committerService =
-          Executors.newSingleThreadExecutor(
-              new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
-
-      completedRewrites = Queues.newConcurrentLinkedQueue();
-      committedRewrites = Lists.newArrayList();
-      inProgressCommits = Queues.newConcurrentLinkedQueue();
-    }
-
-    /** Starts a single threaded executor service for handling file group commits. */
-    public void start() {
-      Preconditions.checkState(
-          running.compareAndSet(false, true), "Rewrite Commit service already started");
-      LOG.info("Starting commit service for {}", table);
-      // Partial progress commit service
-      committerService.execute(
-          () -> {
-            while (running.get() || completedRewrites.size() > 0 || inProgressCommits.size() > 0) {
-              try {
-                if (completedRewrites.size() == 0 && inProgressCommits.size() == 0) {
-                  // Give other threads a chance to make progress
-                  Thread.sleep(100);
-                }
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException("Interrupted while processing commits", e);
-              }
-
-              // commit whatever is left once done with writing.
-              if (!running.get() && completedRewrites.size() > 0) {
-                commitReadyCommitGroups();
-              }
-            }
-          });
-    }
-
-    /**
-     * Places a file group in the queue and commits a batch of file groups if {@link
-     * #rewritesPerCommit} number of file groups are present in the queue.
-     *
-     * @param group file group to eventually be committed
-     */
-    public void offer(RewriteFileGroup group) {
-      LOG.debug("Offered to commit service: {}", group);
-      Preconditions.checkState(
-          running.get(), "Cannot add rewrites to a service which has already been closed");
-      completedRewrites.add(group);
-      commitReadyCommitGroups();
-    }
-
-    /** Returns all File groups which have been committed */
-    public List<RewriteFileGroup> results() {
-      Preconditions.checkState(
-          committerService.isShutdown(),
-          "Cannot get results from a service which has not been closed");
-      return committedRewrites;
+      super(table, rewritesPerCommit);
     }
 
     @Override
-    public void close() {
-      Preconditions.checkState(
-          running.compareAndSet(true, false), "Cannot close already closed RewriteService");
-      LOG.info("Closing commit service for {} waiting for all commits to finish", table);
-      committerService.shutdown();
-
-      boolean timeout = false;
-      try {
-        // All rewrites have completed and all new files have been created, we are now waiting for
-        // the commit
-        // pool to finish doing its commits to Iceberg State. In the case of partial progress this
-        // should
-        // have been occurring simultaneously with rewrites, if not there should be only a single
-        // commit operation.
-        if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
-          LOG.warn(
-              "Commit operation did not complete within 120 minutes of the all files "
-                  + "being rewritten. This may mean that some changes were not successfully committed to the "
-                  + "table.");
-          timeout = true;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            "Cannot complete commit for rewrite, commit service interrupted", e);
-      }
-
-      if (!completedRewrites.isEmpty() && timeout) {
-        LOG.error("Attempting to cleanup uncommitted file groups");
-        completedRewrites.forEach(RewriteDataFilesCommitManager.this::abortFileGroup);
-      }
-
-      Preconditions.checkArgument(
-          !timeout && completedRewrites.isEmpty(),
-          "Timeout occurred when waiting for commits to complete. "
-              + "{} file groups committed. {} file groups remain uncommitted. "
-              + "Retry this operation to attempt rewriting the failed groups.",
-          committedRewrites.size(),
-          completedRewrites.size());
-
-      Preconditions.checkState(
-          completedRewrites.isEmpty(),
-          "File groups offered after service was closed, "
-              + "they were not successfully committed.");
+    protected void commitOrClean(Set<RewriteFileGroup> batch) {
+      RewriteDataFilesCommitManager.this.commitOrClean(batch);
     }
 
-    private void commitReadyCommitGroups() {
-      Set<RewriteFileGroup> batch = null;
-      if (canCreateCommitGroup()) {
-        synchronized (completedRewrites) {
-          if (canCreateCommitGroup()) {
-            batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
-            for (int i = 0; i < rewritesPerCommit && !completedRewrites.isEmpty(); i++) {
-              batch.add(completedRewrites.poll());
-            }
-          }
-        }
-      }
-
-      if (batch != null) {
-        String inProgressCommitToken = UUID.randomUUID().toString();
-        inProgressCommits.add(inProgressCommitToken);
-        try {
-          commitOrClean(batch);
-          committedRewrites.addAll(batch);
-        } catch (Exception e) {
-          LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e);
-        }
-        inProgressCommits.remove(inProgressCommitToken);
-      }
-    }
-
-    private boolean canCreateCommitGroup() {
-      // Either we have a full commit group, or we have completed writing and need to commit
-      // what is left over
-      return (completedRewrites.size() >= rewritesPerCommit)
-          || (!running.get() && completedRewrites.size() > 0);
+    @Override
+    protected void abortFileGroup(RewriteFileGroup group) {
+      RewriteDataFilesCommitManager.this.abortFileGroup(group);
     }
   }
 }