You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/07/15 19:33:08 UTC

[gobblin] branch master updated: [GOBBLIN-1486] Documentation improvement for Gobblin Metadata Ingestion pipeline (#3325)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb30cb  [GOBBLIN-1486] Documentation improvement for Gobblin Metadata Ingestion pipeline (#3325)
5fb30cb is described below

commit 5fb30cb16f80ce6ea848e61c36e3c1d48b8bdd0f
Author: Lei <au...@users.noreply.github.com>
AuthorDate: Thu Jul 15 12:33:00 2021 -0700

    [GOBBLIN-1486] Documentation improvement for Gobblin Metadata Ingestion pipeline (#3325)
    
    Fixing javadoc and some typo while reading the code about metadata ingestion pipeline. This PR covers mostly the part of HiveMetadataWriter and GobblinMCEWriter, there will be a follow up on IcebergMetadataWriter.
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 35 ++++++---
 .../apache/gobblin/hive/writer/MetadataWriter.java | 18 +++--
 .../apache/gobblin/iceberg/GobblinMCEProducer.java |  3 +-
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 72 ++++++++++--------
 .../iceberg/writer/IcebergMetadataWriter.java      | 86 +++++++++++-----------
 5 files changed, 125 insertions(+), 89 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index b7ad453..55fcd2b 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -49,6 +49,8 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.AvroUtils;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 
 
@@ -58,8 +60,11 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
  * and then register the partition if needed
  * For rewrite_files operation, this writer will directly register the new hive spec and try to de-register the old hive spec if oldFilePrefixes is set
  * For drop_files operation, this writer will de-register the hive partition only if oldFilePrefixes is set in the GMCE
+ *
+ * Added warning suppression for all references of {@link Cache}.
  */
 @Slf4j
+@SuppressWarnings("UnstableApiUsage")
 public class HiveMetadataWriter implements MetadataWriter {
 
   private static final String HIVE_REGISTRATION_WHITELIST = "hive.registration.whitelist";
@@ -69,27 +74,32 @@ public class HiveMetadataWriter implements MetadataWriter {
   private final Joiner tableNameJoiner = Joiner.on('.');
   private final Closer closer = Closer.create();
   protected final HiveRegister hiveRegister;
-  private final WhitelistBlacklist whiteistBlacklist;
+  private final WhitelistBlacklist whitelistBlacklist;
   @Getter
   private final KafkaSchemaRegistry schemaRegistry;
   private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>> currentExecutionMap;
 
+  /* Mapping from tableIdentifier to a cache, key'ed by timestamp and value is not in use. */
   private final HashMap<String, Cache<String, String>> schemaCreationTimeMap;
+
+  /* Mapping from tableIdentifier to a cache, key'ed by a list of partitions with value as HiveSpec object. */
   private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
-  private final HashMap<String, String> lastestSchemaMap;
+
+  /* Mapping from tableIdentifier to latest schema observed. */
+  private final HashMap<String, String> latestSchemaMap;
   private final long timeOutSeconds;
   private State state;
 
   public HiveMetadataWriter(State state) throws IOException {
     this.state = state;
     this.hiveRegister = this.closer.register(HiveRegister.get(state));
-    this.whiteistBlacklist = new WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
+    this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
         state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
     this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
     this.currentExecutionMap = new HashMap<>();
     this.schemaCreationTimeMap = new HashMap<>();
     this.specMaps = new HashMap<>();
-    this.lastestSchemaMap = new HashMap<>();
+    this.latestSchemaMap = new HashMap<>();
     this.timeOutSeconds =
         state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
   }
@@ -124,9 +134,9 @@ public class HiveMetadataWriter implements MetadataWriter {
     }
 
     //ToDo: after making sure all spec has topic.name set, we should use topicName as key for schema
-    if (!lastestSchemaMap.containsKey(tableKey)) {
+    if (!latestSchemaMap.containsKey(tableKey)) {
       HiveTable existingTable = this.hiveRegister.getTable(dbName, tableName).get();
-      lastestSchemaMap.put(tableKey,
+      latestSchemaMap.put(tableKey,
           existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
     }
 
@@ -138,6 +148,7 @@ public class HiveMetadataWriter implements MetadataWriter {
       //In case the topic name is not the table name or the topic name contains '-'
       topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
     }
+
     switch (gmce.getOperationType()) {
       case add_files: {
         addFiles(gmce, newSpecsMap, dbName, tableName, topicName);
@@ -247,18 +258,18 @@ public class HiveMetadataWriter implements MetadataWriter {
                 .build());
         if (gmce.getSchemaSource() == SchemaSource.EVENT) {
           // Schema source is Event, update schema anyway
-          lastestSchemaMap.put(tableKey, newSchemaString);
+          latestSchemaMap.put(tableKey, newSchemaString);
           // Clear the schema versions cache so next time if we see schema source is schemaRegistry, we will contact schemaRegistry and update
           existedSchemaCreationTimes.cleanUp();
         } else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY && newSchemaCreationTime != null
             && existedSchemaCreationTimes.getIfPresent(newSchemaCreationTime) == null) {
           // We haven't seen this schema before, so we query schemaRegistry to get latest schema
-          if (topicName != null && !topicName.isEmpty()) {
+          if (StringUtils.isNoneEmpty(topicName)) {
             Schema latestSchema = (Schema) this.schemaRegistry.getLatestSchemaByTopic(topicName);
             String latestCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
             if (latestCreationTime.equals(newSchemaCreationTime)) {
               //new schema is the latest schema, we update our record
-              lastestSchemaMap.put(tableKey, newSchemaString);
+              latestSchemaMap.put(tableKey, newSchemaString);
             }
             existedSchemaCreationTimes.put(newSchemaCreationTime, "");
           }
@@ -283,7 +294,7 @@ public class HiveMetadataWriter implements MetadataWriter {
     //Force to set the schema even there is no schema literal defined in the spec
     spec.getTable()
         .getSerDeProps()
-        .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), lastestSchemaMap.get(tableKey));
+        .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
   }
 
   private String fetchSchemaFromTable(String dbName, String tableName) throws IOException {
@@ -302,10 +313,10 @@ public class HiveMetadataWriter implements MetadataWriter {
     GenericRecord genericRecord = recordEnvelope.getRecord();
     GobblinMetadataChangeEvent gmce =
         (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
-    if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+    if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
       write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
     } else {
-      log.debug(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+      log.debug(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
           tableSpec.getTable().getTableName()));
     }
   }
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index e4a0406..054c8a9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -21,13 +21,16 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+
 import org.apache.avro.generic.GenericRecord;
+
 import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.stream.RecordEnvelope;
 
 
 /**
- * This is the interface of the writer which is used to calculate and accumulate the desired metadata and register to the metadata store
+ * This is the interface that is used to calculate and accumulate the desired metadata and register to the metadata store
  */
 public interface MetadataWriter extends Closeable {
   String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
@@ -36,11 +39,16 @@ public interface MetadataWriter extends Closeable {
   /*
   Register the metadata of specific table to the metadata store
    */
-  public void flush(String dbName, String tableName) throws IOException;
+  void flush(String dbName, String tableName) throws IOException;
 
-  /*
-  Compute and cache the metadata from the GMCE
+  /**
+   * Compute and cache the metadata from the GMCE
+   * @param recordEnvelope Containing {@link GobblinMetadataChangeEvent}
+   * @param newSpecsMap The container (as a map) for new specs.
+   * @param oldSpecsMap The container (as a map) for old specs.
+   * @param tableSpec A sample table spec representing one instances among all path's {@link HiveSpec}
+   * @throws IOException
    */
-  public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap,
+  void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap,
       Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException;
 }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 0445653..130ee04 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
 
+import static org.apache.gobblin.iceberg.writer.GobblinMCEWriter.HIVE_PARTITION_NAME;
+
 
 /**
  * A class running along with data ingestion pipeline for emitting GobblinMCE (Gobblin Metadata Change Event
@@ -65,7 +67,6 @@ public abstract class GobblinMCEProducer implements Closeable {
 
   public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
   public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
-  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
   private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
   private static final String DATASET_ORIGIN_KEY = "dataset.origin";
   private static final String DEFAULT_DATASET_ORIGIN = "PROD";
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index b34e2a9..2d8a340 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -17,14 +17,6 @@
 
 package org.apache.gobblin.iceberg.writer;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,17 +27,34 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -55,21 +64,19 @@ import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.DataWriterBuilder;
-import org.apache.gobblin.hive.writer.MetadataWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 
 
 /**
- * This is a Wrapper of all the MetadataWriters. This writer will manage a list of {@Link MetadataWriter} which do the actual
+ * This is a wrapper of all the MetadataWriters. This writer will manage a list of {@Link MetadataWriter} which do the actual
  * metadata registration for different metadata store.
  * This writer is responsible for:
+ * 0. Consuming {@link GobblinMetadataChangeEvent} and execute metadata registration.
  * 1. Managing a map of Iceberg tables that it is currently processing
  * 2. Ensuring that the underlying metadata writers flush the metadata associated with each Iceberg table
  * 3. Call flush method for a specific table on a change in operation type
  * 4. Calculate {@Link HiveSpec}s and pass them to metadata writers to register metadata
  */
+@SuppressWarnings("UnstableApiUsage")
 @Slf4j
 public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
@@ -92,7 +99,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   private Closer closer = Closer.create();
   protected final AtomicLong recordCount = new AtomicLong(0L);
 
-  public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
+  GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
     newSpecsMaps = new HashMap<>();
     oldSpecsMaps = new HashMap<>();
     metadataWriters = new ArrayList<>();
@@ -116,13 +123,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
 
   /**
    * This method is used to get the specs map for a list of files. It will firstly look up in cache
-   * to see if the specs has been calculated previously to recude the computing time
+   * to see if the specs has been calculated previously to reduce the computing time
    * We have an assumption here: "for the same path and the same operation type, the specs should be the same"
-   * @param files  List of files' names
+   * @param files  List of leaf-level files' names
    * @param specsMap The specs map for the files
    * @param cache  Cache that store the pre-calculated specs information
    * @param registerState State used to compute the specs
-   * @param isPrefix if the name of file is prefix, if not we get the parent file name to calculate the hiveSpec
+   * @param isPrefix If false,  we get the parent file name to calculate the hiveSpec. This is to comply with
+   *                 hive's convention on partition which is the parent folder for leaf-level files.
    * @throws IOException
    */
   private void computeSpecMap(List<String> files, ConcurrentHashMap<String, Collection<HiveSpec>> specsMap,
@@ -133,14 +141,10 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         @Override
         public Void call() throws Exception {
           try {
-            Path regPath = new Path(file);
-            if (!isPrefix) {
-              regPath = regPath.getParent();
-            }
-            //Use raw path for federation purpose
+            Path regPath = isPrefix ? new Path(file) : new Path(file).getParent();
+            //Use raw path to comply with HDFS federation setting.
             Path rawPath = new Path(regPath.toUri().getRawPath());
-            cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath));
-            specsMap.put(regPath.toString(), cache.getIfPresent(regPath.toString()));
+            specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath)));
           } catch (Exception e) {
             log.warn("Cannot get Hive Spec for {} using policy {}", file, policy.toString());
           }
@@ -196,13 +200,15 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         && datasetOperationTypeMap.get(datasetName) != gmce.getOperationType()) {
       datasetOperationTypeMap.put(datasetName, gmce.getOperationType());
     }
-    //We assume in one same operation interval, for same dataset, the table property will not change to reduce the time to compute hiveSpec.
+
+    // Mapping from URI of path of arrival files to the list of HiveSpec objects.
+    // We assume in one same operation interval, for same dataset, the table property will not change to reduce the time to compute hiveSpec.
     ConcurrentHashMap<String, Collection<HiveSpec>> newSpecsMap = new ConcurrentHashMap<>();
     ConcurrentHashMap<String, Collection<HiveSpec>> oldSpecsMap = new ConcurrentHashMap<>();
 
     if (gmce.getNewFiles() != null) {
       State registerState = setHiveRegProperties(state, gmce, true);
-      computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(), dataFile -> dataFile.getFilePath())),
+      computeSpecMap(Lists.newArrayList(Iterables.transform(gmce.getNewFiles(), DataFile::getFilePath)),
           newSpecsMap, newSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder.newBuilder()
               .expireAfterAccess(state.getPropAsInt(MetadataWriter.CACHE_EXPIRING_TIME,
                   MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
@@ -226,6 +232,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     if (newSpecsMap.isEmpty() && oldSpecsMap.isEmpty()) {
       return;
     }
+
+    // Sample one entry among all "Path <--> List<HiveSpec>" pair is good enough, reasoning:
+    // 0. Objective here is to execute metadata registration for all target table destinations of a dataset,
+    // 1. GMCE guarantees all paths coming from single dataset (but not necessary single "partition" in Hive's layout),
+    // 2. HiveSpec of paths from a dataset should be targeting at the same set of table destinations,
+    // 3. therefore fetching one path's HiveSpec and iterate through it is good enough to cover all table destinations.
     Collection<HiveSpec> specs =
         newSpecsMap.isEmpty() ? oldSpecsMap.values().iterator().next() : newSpecsMap.values().iterator().next();
     for (HiveSpec spec : specs) {
@@ -248,8 +260,8 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
 
   /**
    * Call the metadata writers to do flush each table metadata.
-   * Flush of metadata writer is the place that do really metadata
-   * registration (For iceberg, this method will generate a snapshot)
+   * Flush of metadata writer is the place that do real metadata
+   * registrations (e.g. for iceberg, this method will generate a snapshot)
    * @throws IOException
    */
   @Override
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 650b511..59a8706 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -17,18 +17,6 @@
 
 package org.apache.gobblin.iceberg.writer;
 
-import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,37 +36,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
-import org.apache.gobblin.hive.AutoCloseableHiveLock;
-import org.apache.gobblin.hive.HiveLock;
-import org.apache.gobblin.hive.HivePartition;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
-import org.apache.gobblin.hive.spec.HiveSpec;
-import org.apache.gobblin.iceberg.Utils.IcebergUtils;
-import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
-import org.apache.gobblin.metadata.OperationType;
-import org.apache.gobblin.metrics.GobblinMetricsRegistry;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
-import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
-import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.ClustersNames;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.WriterUtils;
-import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -108,6 +68,50 @@ import org.joda.time.DateTime;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
+import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+import org.apache.gobblin.hive.AutoCloseableHiveLock;
+import org.apache.gobblin.hive.HiveLock;
+import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.iceberg.Utils.IcebergUtils;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.GobblinMetricsRegistry;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.WriterUtils;
+
 
 /**
  * This writer is used to calculate iceberg metadata from GMCE and register to iceberg