You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/09/13 18:40:13 UTC
[gobblin] branch master updated: Define basics for collecting Iceberg metadata for the current snapshot (#3559)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new da914e272 Define basics for collecting Iceberg metadata for the current snapshot (#3559)
da914e272 is described below
commit da914e272c935c519af9d60c1b35a1ab59f1165e
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Tue Sep 13 11:40:07 2022 -0700
Define basics for collecting Iceberg metadata for the current snapshot (#3559)
* [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554)
* remove jcentral
* Use gradle plugin portal for shadow
* Use maven central in all other cases
* [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551)
* Allow first time failure to authenticate with Azkaban to fail silently
* Fix findbugs report
* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
* Add handling for fetchSession throwing an exception
* Add logging when fails on constructor and initialization, but continue to local deploy
* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
* Fixed vars
* Revert changes on azkabanSpecProducer
* clean up error throwing
* revert function checking changes
* Reformat file
* Clean up function
* Format file for try/catch
* Allow first time failure to authenticate with Azkaban to fail silently
* Fix findbugs report
* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow
* Fixed rebase
* Fixed rebase
* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor
* Add whitespace back
* fix helix job wait completion bug when job goes to STOPPING state (#3556)
address comments
update stoppingStateEndTime with currentTime
update test cases
* [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552
* before starting reduce
* after first record is reduced
* after reducing every 1000 records
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
* Define basics for collecting Iceberg metadata for the current snapshot
* [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
* [GOBBLIN-1673] Schema for dynamic work unit message
* [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
* Address review comments
* Correct import order
Co-authored-by: Matthew Ho <ho...@gmail.com>
Co-authored-by: Andy Jiang <20...@users.noreply.github.com>
Co-authored-by: Hanghang Nate Liu <na...@gmail.com>
Co-authored-by: umustafi <um...@gmail.com>
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
Co-authored-by: William Lo <lo...@gmail.com>
---
.../management/copy/iceberg/IcebergCatalog.java | 26 +++++++
.../copy/iceberg/IcebergCatalogFactory.java | 31 ++++++++
.../copy/iceberg/IcebergHiveCatalog.java | 40 ++++++++++
.../copy/iceberg/IcebergSnapshotInfo.java | 61 +++++++++++++++
.../data/management/copy/iceberg/IcebergTable.java | 86 ++++++++++++++++++++++
5 files changed, 244 insertions(+)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
new file mode 100644
index 000000000..70423e6a8
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+
+/**
+ * Any catalog from which to access {@link IcebergTable}s.
+ */
+public interface IcebergCatalog {
+ IcebergTable openTable(String dbName, String tableName);
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
new file mode 100644
index 000000000..43dff9fc6
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hive.HiveCatalogs;
+
+
+/**
+ * Provides an {@link IcebergCatalog}.
+ */
+public class IcebergCatalogFactory {
+ public static IcebergCatalog create(Configuration configuration) {
+ return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
+ }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
new file mode 100644
index 000000000..d8ffdb799
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveCatalog;
+
+
+/**
+ * Hive-Metastore-based {@link IcebergCatalog}.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergHiveCatalog implements IcebergCatalog {
+ // NOTE: specifically necessitates `HiveCatalog`, as `BaseMetastoreCatalog.newTableOps` is `protected`!
+ private final HiveCatalog hc;
+
+ @Override
+ public IcebergTable openTable(String dbName, String tableName) {
+ return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, tableName)));
+ }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
new file mode 100644
index 000000000..c51c1a27d
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Information about the metadata file and data file paths of a single Iceberg Snapshot.
+ */
+@Data
+public class IcebergSnapshotInfo {
+
+ @Data
+ public static class ManifestFileInfo {
+ private final String manifestFilePath;
+ private final List<String> listedFilePaths;
+ }
+
+ private final Long snapshotId;
+ private final Instant timestamp;
+ private final String metadataPath;
+ private final String manifestListPath;
+ private final List<ManifestFileInfo> manifestFiles;
+
+ public List<String> getManifestFilePaths() {
+ return manifestFiles.stream().map(ManifestFileInfo::getManifestFilePath).collect(Collectors.toList());
+ }
+
+ public List<String> getAllDataFilePaths() {
+ return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
+ }
+
+ public List<String> getAllPaths() {
+ List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
+ result.addAll(getManifestFilePaths());
+ result.addAll(getAllDataFilePaths());
+ return result;
+ }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
new file mode 100644
index 000000000..f6ff42698
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
+
+
+/**
+ * Exposes metadata information for a single Iceberg table.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergTable {
+ private final TableOperations tableOps;
+
+ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
+ TableMetadata current = tableOps.current();
+ Snapshot snapshot = current.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.allManifests();
+ return new IcebergSnapshotInfo(
+ snapshot.snapshotId(),
+ Instant.ofEpochMilli(snapshot.timestampMillis()),
+ current.metadataFileLocation(),
+ snapshot.manifestListLocation(),
+ // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
+ calcAllManifestFileInfo(manifests, tableOps.io())
+ );
+ }
+
+ @VisibleForTesting
+ static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> manifests, FileIO io) throws IOException {
+ List<ManifestFileInfo> result = Lists.newArrayList();
+ for (ManifestFile manifest : manifests) {
+ result.add(calcManifestFileInfo(manifest, io));
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+ return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
+ }
+
+ @VisibleForTesting
+ static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
+ CloseableIterable<String> manifestPathsIterable = ManifestFiles.readPaths(manifest, io);
+ try {
+ return Lists.newArrayList(manifestPathsIterable);
+ } finally {
+ manifestPathsIterable.close();
+ }
+ }
+}