You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/09/13 18:40:13 UTC

[gobblin] branch master updated: Define basics for collecting Iceberg metadata for the current snapshot (#3559)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new da914e272 Define basics for collecting Iceberg metadata for the current snapshot (#3559)
da914e272 is described below

commit da914e272c935c519af9d60c1b35a1ab59f1165e
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Tue Sep 13 11:40:07 2022 -0700

    Define basics for collecting Iceberg metadata for the current snapshot (#3559)
    
    * [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554)
    
    * remove jcentral
    * Use gradle plugin portal for shadow
    * Use maven central in all other cases
    
    * [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551)
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
    
    * Add handling for fetchSession throwing an exception
    
    * Add logging when fails on constructor and initialization, but continue to local deploy
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
    
    * Fixed vars
    
    * Revert changes on azkabanSpecProducer
    
    * clean up error throwing
    
    * revert function checking changes
    
    * Reformat file
    
    * Clean up function
    
    * Format file for try/catch
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
    
    * Fixed rebase
    
    * Fixed rebase
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
    
    * Add whitespace back
    
    * fix helix job wait completion bug when job goes to STOPPING state (#3556)
    
    address comments
    
    update stoppingStateEndTime with currentTime
    
    update test cases
    
    * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
    
    * before starting reduce
    * after first record is reduced
    * after reducing every 1000 records
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    
    * Define basics for collecting Iceberg metadata for the current snapshot
    
    * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
    
    * [GOBBLIN-1673] Schema for dynamic work unit message
    
    * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
    
    * Address review comments
    
    * Correct import order
    
    Co-authored-by: Matthew Ho <ho...@gmail.com>
    Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
    Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
    Co-authored-by: umustafi <um...@gmail.com>
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
    Co-authored-by: William Lo <lo...@gmail.com>
---
 .../management/copy/iceberg/IcebergCatalog.java    | 26 +++++++
 .../copy/iceberg/IcebergCatalogFactory.java        | 31 ++++++++
 .../copy/iceberg/IcebergHiveCatalog.java           | 40 ++++++++++
 .../copy/iceberg/IcebergSnapshotInfo.java          | 61 +++++++++++++++
 .../data/management/copy/iceberg/IcebergTable.java | 86 ++++++++++++++++++++++
 5 files changed, 244 insertions(+)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
new file mode 100644
index 000000000..70423e6a8
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+
+/**
+ * Any catalog from which to access {@link IcebergTable}s.
+ */
+public interface IcebergCatalog {
+  IcebergTable openTable(String dbName, String tableName);
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
new file mode 100644
index 000000000..43dff9fc6
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hive.HiveCatalogs;
+
+
+/**
+ * Provides an {@link IcebergCatalog}.
+ */
+public class IcebergCatalogFactory {
+  public static IcebergCatalog create(Configuration configuration) {
+    return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
new file mode 100644
index 000000000..d8ffdb799
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveCatalog;
+
+
+/**
+ * Hive-Metastore-based {@link IcebergCatalog}.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergHiveCatalog implements IcebergCatalog {
+  // NOTE: specifically necessitates `HiveCatalog`, as `BaseMetastoreCatalog.newTableOps` is `protected`!
+  private final HiveCatalog hc;
+
+  @Override
+  public IcebergTable openTable(String dbName, String tableName) {
+    return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, tableName)));
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
new file mode 100644
index 000000000..c51c1a27d
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Information about the metadata file and data file paths of a single Iceberg Snapshot.
+ */
+@Data
+public class IcebergSnapshotInfo {
+
+  @Data
+  public static class ManifestFileInfo {
+    private final String manifestFilePath;
+    private final List<String> listedFilePaths;
+  }
+
+  private final Long snapshotId;
+  private final Instant timestamp;
+  private final String metadataPath;
+  private final String manifestListPath;
+  private final List<ManifestFileInfo> manifestFiles;
+
+  public List<String> getManifestFilePaths() {
+    return manifestFiles.stream().map(ManifestFileInfo::getManifestFilePath).collect(Collectors.toList());
+  }
+
+  public List<String> getAllDataFilePaths() {
+    return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
+  }
+
+  public List<String> getAllPaths() {
+    List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
+    result.addAll(getManifestFilePaths());
+    result.addAll(getAllDataFilePaths());
+    return result;
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
new file mode 100644
index 000000000..f6ff42698
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
+
+
+/**
+ * Exposes metadata information for a single Iceberg table.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergTable {
+  private final TableOperations tableOps;
+
+  public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
+    TableMetadata current = tableOps.current();
+    Snapshot snapshot = current.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.allManifests();
+    return new IcebergSnapshotInfo(
+        snapshot.snapshotId(),
+        Instant.ofEpochMilli(snapshot.timestampMillis()),
+        current.metadataFileLocation(),
+        snapshot.manifestListLocation(),
+        // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
+        calcAllManifestFileInfo(manifests, tableOps.io())
+      );
+  }
+
+  @VisibleForTesting
+  static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> manifests, FileIO io) throws IOException {
+    List<ManifestFileInfo> result = Lists.newArrayList();
+    for (ManifestFile manifest : manifests) {
+      result.add(calcManifestFileInfo(manifest, io));
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+    return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
+  }
+
+  @VisibleForTesting
+  static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
+    CloseableIterable<String> manifestPathsIterable = ManifestFiles.readPaths(manifest, io);
+    try {
+      return Lists.newArrayList(manifestPathsIterable);
+    } finally {
+      manifestPathsIterable.close();
+    }
+  }
+}