You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/07/15 19:33:08 UTC
[gobblin] branch master updated: [GOBBLIN-1486] Documentation
improvement for Gobblin Metadata Ingestion pipeline (#3325)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb30cb [GOBBLIN-1486] Documentation improvement for Gobblin Metadata Ingestion pipeline (#3325)
5fb30cb is described below
commit 5fb30cb16f80ce6ea848e61c36e3c1d48b8bdd0f
Author: Lei <au...@users.noreply.github.com>
AuthorDate: Thu Jul 15 12:33:00 2021 -0700
[GOBBLIN-1486] Documentation improvement for Gobblin Metadata Ingestion pipeline (#3325)
Fixing javadoc and some typo while reading the code about metadata ingestion pipeline. This PR covers mostly the part of HiveMetadataWriter and GobblinMCEWriter, there will be a follow up on IcebergMetadataWriter.
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 35 ++++++---
.../apache/gobblin/hive/writer/MetadataWriter.java | 18 +++--
.../apache/gobblin/iceberg/GobblinMCEProducer.java | 3 +-
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 72 ++++++++++--------
.../iceberg/writer/IcebergMetadataWriter.java | 86 +++++++++++-----------
5 files changed, 125 insertions(+), 89 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index b7ad453..55fcd2b 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -49,6 +49,8 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -58,8 +60,11 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
* and then register the partition if needed
* For rewrite_files operation, this writer will directly register the new hive spec and try to de-register the old hive spec if oldFilePrefixes is set
* For drop_files operation, this writer will de-register the hive partition only if oldFilePrefixes is set in the GMCE
+ *
+ * Added warning suppression for all references of {@link Cache}.
*/
@Slf4j
+@SuppressWarnings("UnstableApiUsage")
public class HiveMetadataWriter implements MetadataWriter {
private static final String HIVE_REGISTRATION_WHITELIST = "hive.registration.whitelist";
@@ -69,27 +74,32 @@ public class HiveMetadataWriter implements MetadataWriter {
private final Joiner tableNameJoiner = Joiner.on('.');
private final Closer closer = Closer.create();
protected final HiveRegister hiveRegister;
- private final WhitelistBlacklist whiteistBlacklist;
+ private final WhitelistBlacklist whitelistBlacklist;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>> currentExecutionMap;
+ /* Mapping from tableIdentifier to a cache, key'ed by timestamp and value is not in use. */
private final HashMap<String, Cache<String, String>> schemaCreationTimeMap;
+
+ /* Mapping from tableIdentifier to a cache, key'ed by a list of partitions with value as HiveSpec object. */
private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
- private final HashMap<String, String> lastestSchemaMap;
+
+ /* Mapping from tableIdentifier to latest schema observed. */
+ private final HashMap<String, String> latestSchemaMap;
private final long timeOutSeconds;
private State state;
public HiveMetadataWriter(State state) throws IOException {
this.state = state;
this.hiveRegister = this.closer.register(HiveRegister.get(state));
- this.whiteistBlacklist = new WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
+ this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
this.currentExecutionMap = new HashMap<>();
this.schemaCreationTimeMap = new HashMap<>();
this.specMaps = new HashMap<>();
- this.lastestSchemaMap = new HashMap<>();
+ this.latestSchemaMap = new HashMap<>();
this.timeOutSeconds =
state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
}
@@ -124,9 +134,9 @@ public class HiveMetadataWriter implements MetadataWriter {
}
//ToDo: after making sure all spec has topic.name set, we should use topicName as key for schema
- if (!lastestSchemaMap.containsKey(tableKey)) {
+ if (!latestSchemaMap.containsKey(tableKey)) {
HiveTable existingTable = this.hiveRegister.getTable(dbName, tableName).get();
- lastestSchemaMap.put(tableKey,
+ latestSchemaMap.put(tableKey,
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
}
@@ -138,6 +148,7 @@ public class HiveMetadataWriter implements MetadataWriter {
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
+
switch (gmce.getOperationType()) {
case add_files: {
addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
@@ -247,18 +258,18 @@ public class HiveMetadataWriter implements MetadataWriter {
.build());
if (gmce.getSchemaSource() == SchemaSource.EVENT) {
// Schema source is Event, update schema anyway
- lastestSchemaMap.put(tableKey, newSchemaString);
+ latestSchemaMap.put(tableKey, newSchemaString);
// Clear the schema versions cache so next time if we see schema source is schemaRegistry, we will contact schemaRegistry and update
existedSchemaCreationTimes.cleanUp();
} else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY && newSchemaCreationTime != null
&& existedSchemaCreationTimes.getIfPresent(newSchemaCreationTime) == null) {
// We haven't seen this schema before, so we query schemaRegistry to get latest schema
- if (topicName != null && !topicName.isEmpty()) {
+ if (StringUtils.isNoneEmpty(topicName)) {
Schema latestSchema = (Schema) this.schemaRegistry.getLatestSchemaByTopic(topicName);
String latestCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
if (latestCreationTime.equals(newSchemaCreationTime)) {
//new schema is the latest schema, we update our record
- lastestSchemaMap.put(tableKey, newSchemaString);
+ latestSchemaMap.put(tableKey, newSchemaString);
}
existedSchemaCreationTimes.put(newSchemaCreationTime, "");
}
@@ -283,7 +294,7 @@ public class HiveMetadataWriter implements MetadataWriter {
//Force to set the schema even there is no schema literal defined in the spec
spec.getTable()
.getSerDeProps()
- .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), lastestSchemaMap.get(tableKey));
+ .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
}
private String fetchSchemaFromTable(String dbName, String tableName) throws IOException {
@@ -302,10 +313,10 @@ public class HiveMetadataWriter implements MetadataWriter {
GenericRecord genericRecord = recordEnvelope.getRecord();
GobblinMetadataChangeEvent gmce =
(GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
- if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+ if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
} else {
- log.debug(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+ log.debug(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName()));
}
}
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index e4a0406..054c8a9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -21,13 +21,16 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+
import org.apache.avro.generic.GenericRecord;
+
import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.stream.RecordEnvelope;
/**
- * This is the interface of the writer which is used to calculate and accumulate the desired metadata and register to the metadata store
+ * This is the interface that is used to calculate and accumulate the desired metadata and register to the metadata store
*/
public interface MetadataWriter extends Closeable {
String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
@@ -36,11 +39,16 @@ public interface MetadataWriter extends Closeable {
/*
Register the metadata of specific table to the metadata store
*/
- public void flush(String dbName, String tableName) throws IOException;
+ void flush(String dbName, String tableName) throws IOException;
- /*
- Compute and cache the metadata from the GMCE
+ /**
+ * Compute and cache the metadata from the GMCE
+ * @param recordEnvelope Containing {@link GobblinMetadataChangeEvent}
+ * @param newSpecsMap The container (as a map) for new specs.
+ * @param oldSpecsMap The container (as a map) for old specs.
+ * @param tableSpec A sample table spec representing one instances among all path's {@link HiveSpec}
+ * @throws IOException
*/
- public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap,
+ void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException;
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 0445653..130ee04 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
+import static org.apache.gobblin.iceberg.writer.GobblinMCEWriter.HIVE_PARTITION_NAME;
+
/**
* A class running along with data ingestion pipeline for emitting GobblinMCE (Gobblin Metadata Change Event
@@ -65,7 +67,6 @@ public abstract class GobblinMCEProducer implements Closeable {
public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
- public static final String HIVE_PARTITION_NAME = "hive.partition.name";
private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
private static final String DATASET_ORIGIN_KEY = "dataset.origin";
private static final String DEFAULT_DATASET_ORIGIN = "PROD";
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index b34e2a9..2d8a340 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -17,14 +17,6 @@
package org.apache.gobblin.iceberg.writer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,17 +27,34 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.stream.RecordEnvelope;
@@ -55,21 +64,19 @@ import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
-import org.apache.gobblin.hive.writer.MetadataWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
/**
- * This is a Wrapper of all the MetadataWriters. This writer will manage a list of {@Link MetadataWriter} which do the actual
+ * This is a wrapper of all the MetadataWriters. This writer will manage a list of {@Link MetadataWriter} which do the actual
* metadata registration for different metadata store.
* This writer is responsible for:
+ * 0. Consuming {@link GobblinMetadataChangeEvent} and execute metadata registration.
* 1. Managing a map of Iceberg tables that it is currently processing
* 2. Ensuring that the underlying metadata writers flush the metadata associated with each Iceberg table
* 3. Call flush method for a specific table on a change in operation type
* 4. Calculate {@Link HiveSpec}s and pass them to metadata writers to register metadata
*/
+@SuppressWarnings("UnstableApiUsage")
@Slf4j
public class GobblinMCEWriter implements DataWriter<GenericRecord> {
public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
@@ -92,7 +99,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
private Closer closer = Closer.create();
protected final AtomicLong recordCount = new AtomicLong(0L);
- public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
+ GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
newSpecsMaps = new HashMap<>();
oldSpecsMaps = new HashMap<>();
metadataWriters = new ArrayList<>();
@@ -116,13 +123,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
/**
* This method is used to get the specs map for a list of files. It will firstly look up in cache
- * to see if the specs has been calculated previously to recude the computing time
+ * to see if the specs has been calculated previously to reduce the computing time
* We have an assumption here: "for the same path and the same operation type, the specs should be the same"
- * @param files List of files' names
+ * @param files List of leaf-level files' names
* @param specsMap The specs map for the files
* @param cache Cache that store the pre-calculated specs information
* @param registerState State used to compute the specs
- * @param isPrefix if the name of file is prefix, if not we get the parent file name to calculate the hiveSpec
+ * @param isPrefix If false, we get the parent file name to calculate the hiveSpec. This is to comply with
+ * hive's convention on partition which is the parent folder for leaf-level files.
* @throws IOException
*/
private void computeSpecMap(List<String> files, ConcurrentHashMap<String, Collection<HiveSpec>> specsMap,
@@ -133,14 +141,10 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
@Override
public Void call() throws Exception {
try {
- Path regPath = new Path(file);
- if (!isPrefix) {
- regPath = regPath.getParent();
- }
- //Use raw path for federation purpose
+ Path regPath = isPrefix ? new Path(file) : new Path(file).getParent();
+ //Use raw path to comply with HDFS federation setting.
Path rawPath = new Path(regPath.toUri().getRawPath());
- cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath));
- specsMap.put(regPath.toString(), cache.getIfPresent(regPath.toString()));
+ specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath)));
} catch (Exception e) {
log.warn("Cannot get Hive Spec for {} using policy {}", file, policy.toString());
}
@@ -196,13 +200,15 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
&& datasetOperationTypeMap.get(datasetName) != gmce.getOperationType()) {
datasetOperationTypeMap.put(datasetName, gmce.getOperationType());
}
- //We assume in one same operation interval, for same dataset, the table property will not change to reduce the time to compute hiveSpec.
+
+ // Mapping from URI of path of arrival files to the list of HiveSpec objects.
+ // We assume in one same operation interval, for same dataset, the table property will not change to reduce the time to compute hiveSpec.
ConcurrentHashMap<String, Collection<HiveSpec>> newSpecsMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Collection<HiveSpec>> oldSpecsMap = new ConcurrentHashMap<>();
if (gmce.getNewFiles() != null) {
State registerState = setHiveRegProperties(state, gmce, true);
- computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(), dataFile -> dataFile.getFilePath())),
+ computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(), DataFile::getFilePath)),
newSpecsMap, newSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder.newBuilder()
.expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
@@ -226,6 +232,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
if (newSpecsMap.isEmpty() && oldSpecsMap.isEmpty()) {
return;
}
+
+ // Sample one entry among all "Path <--> List<HiveSpec>" pair is good enough, reasoning:
+ // 0. Objective here is to execute metadata registration for all target table destinations of a dataset,
+ // 1. GMCE guarantees all paths coming from single dataset (but not necessary single "partition" in Hive's layout),
+ // 2. HiveSpec of paths from a dataset should be targeting at the same set of table destinations,
+ // 3. therefore fetching one path's HiveSpec and iterate through it is good enough to cover all table destinations.
Collection<HiveSpec> specs =
newSpecsMap.isEmpty() ? oldSpecsMap.values().iterator().next() : newSpecsMap.values().iterator().next();
for (HiveSpec spec : specs) {
@@ -248,8 +260,8 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
/**
* Call the metadata writers to do flush each table metadata.
- * Flush of metadata writer is the place that do really metadata
- * registration (For iceberg, this method will generate a snapshot)
+ * Flush of metadata writer is the place that do real metadata
+ * registrations (e.g. for iceberg, this method will generate a snapshot)
* @throws IOException
*/
@Override
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 650b511..59a8706 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -17,18 +17,6 @@
package org.apache.gobblin.iceberg.writer;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -48,37 +36,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
-import org.apache.gobblin.hive.AutoCloseableHiveLock;
-import org.apache.gobblin.hive.HiveLock;
-import org.apache.gobblin.hive.HivePartition;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
-import org.apache.gobblin.hive.spec.HiveSpec;
-import org.apache.gobblin.iceberg.Utils.IcebergUtils;
-import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
-import org.apache.gobblin.metadata.OperationType;
-import org.apache.gobblin.metrics.GobblinMetricsRegistry;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
-import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
-import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.ClustersNames;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.WriterUtils;
-import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -108,6 +68,50 @@ import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+import org.apache.gobblin.hive.AutoCloseableHiveLock;
+import org.apache.gobblin.hive.HiveLock;
+import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.iceberg.Utils.IcebergUtils;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.GobblinMetricsRegistry;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.WriterUtils;
+
/**
* This writer is used to calculate iceberg metadata from GMCE and register to iceberg