You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/12 16:51:11 UTC

[gobblin] branch avro_1_9 updated: [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch avro_1_9
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/avro_1_9 by this push:
     new 77ef311  [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
77ef311 is described below

commit 77ef311cc8645e3049403bc511932b2fbc7ed0c3
Author: Abhishek Nath <an...@linkedin.com>
AuthorDate: Thu Aug 12 22:21:03 2021 +0530

    [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
    
    * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs.
    
    * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries
    
    * Applied Gobblin codestyle formatting.
    
    Co-authored-by: Lei <au...@users.noreply.github.com>
---
 .../FieldAttributeBasedDeltaFieldsProvider.java    |   5 +-
 ...FieldAttributeBasedDeltaFieldsProviderTest.java |   2 +-
 .../converter/filter/AvroSchemaFieldRemover.java   |   8 +-
 .../apache/gobblin/test/SequentialTestSource.java  |   2 +-
 .../hive/query/HiveAvroORCQueryGenerator.java      |   5 +-
 .../copy/replication/ConfigBasedMultiDatasets.java |   2 +-
 .../retention/dataset/CleanableIcebergDataset.java |   8 +-
 .../management/conversion/hive/HiveSourceTest.java |  38 +++
 .../copy/RecursiveCopyableDatasetTest.java         |   4 +-
 .../java/org/apache/gobblin/hive/HiveLock.java     |   2 +-
 .../hive/metastore/HiveMetaStoreBasedRegister.java |   4 +-
 .../apache/gobblin/hive/writer/MetadataWriter.java |  10 +-
 .../apache/gobblin/iceberg/Utils/IcebergUtils.java |   8 +-
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   |   6 +
 .../iceberg/writer/IcebergMetadataWriter.java      | 260 ++++++++++++++-------
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  18 +-
 .../reporter/FileFailureEventReporterTest.java     |   2 +-
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |   8 +
 .../gobblin/writer/AvroOrcSchemaConverter.java     |   5 +-
 .../google/webmaster/GoogleWebmasterExtractor.java |   2 +-
 .../GoogleWebmasterExtractorIterator.java          |   2 +-
 .../restli/throttling/ThrottlingClientTest.java    |   2 +-
 .../throttling/ConfigStoreBasedPolicyTest.java     |   2 +-
 .../throttling/LimiterServerResourceTest.java      |   2 +-
 ...stHadoopKerberosKeytabAuthenticationPlugin.java |   2 +-
 .../TestStandardGobblinInstanceDriver.java         |   2 +-
 .../instance/hadoop/TestHadoopConfigLoader.java    |   2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |   2 +-
 .../org/apache/gobblin/util/AvroFlattener.java     |  12 +-
 .../org/apache/gobblin/util/AvroSchemaUtils.java   |  67 ++++++
 .../java/org/apache/gobblin/util/AvroUtils.java    |  16 +-
 .../org/apache/gobblin/util/AvroUtilsTest.java     |   8 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  88 ++++---
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   2 +
 34 files changed, 439 insertions(+), 169 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 5f2e9fd..0632045 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.util.AvroSchemaUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParser;
@@ -79,7 +80,9 @@ public class FieldAttributeBasedDeltaFieldsProvider implements AvroDeltaFieldNam
   private List<String> getDeltaFieldNamesForNewSchema(Schema originalSchema) {
     List<String> deltaFields = new ArrayList<>();
     for (Field field : originalSchema.getFields()) {
-      String deltaAttributeField = field.getJsonProp(this.attributeField).getValueAsText();
+      // Avro 1.9 compatible change - replaced deprecated public api getJsonProp with getObjectProp
+      // Use AvroSchemaUtils to convert object to the string value
+      String deltaAttributeField = AvroSchemaUtils.getValueAsString(field, this.attributeField);
       ObjectNode objectNode = getDeltaPropValue(deltaAttributeField);
       if (objectNode == null || objectNode.get(this.deltaPropName) == null) {
         continue;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
index 8557a8c..225f339 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index 87d4101..d8d227b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.codehaus.jackson.JsonNode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -108,7 +107,7 @@ public class AvroSchemaFieldRemover {
   private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
 
     Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
-    convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
+    convertFieldToSchemaWithProps(schema.getObjectProps(), newRecord);
 
     // Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
     // The fields in the incomplete schema will be populated once the current schema is completely processed.
@@ -125,8 +124,9 @@ public class AvroSchemaFieldRemover {
           newField = new Field(field.name(), DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
               field.defaultValue());
         }
-        for (Map.Entry<String, JsonNode> stringJsonNodeEntry : field.getJsonProps().entrySet()) {
-          newField.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+        // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+        for (Map.Entry<String, Object> objectEntry : field.getObjectProps().entrySet()) {
+          newField.addProp(objectEntry.getKey(), objectEntry.getValue());
         }
         newFields.add(newField);
       }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
index 5872808..8aaf328 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.base.Throwables;
+import com.google.common.base.Throwables;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.SourceState;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index fb2a0ec..5aaf73d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -29,6 +29,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.util.AvroSchemaUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -538,7 +539,9 @@ public class HiveAvroORCQueryGenerator {
         .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
-        maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+        // Avro 1.9 compatible change - replaced deprecated public api getJsonProp with getObjectProp
+        // Use AvroSchemaUtils to convert object to the string value
+        maxLength = AvroSchemaUtils.getValueAsInteger(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
       } catch (Exception ex) {
         throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex);
       }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
index 3f9a57f..076b279 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
@@ -17,7 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.replication;
 
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.gobblin.dataset.Dataset;
 import java.io.IOException;
 import java.net.URI;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
index c351ef1..e4d4450 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
@@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A subclass of {@link ConfigurableCleanableDataset} that overwrite the {@link ConfigurableCleanableDataset#cleanImpl(Collection)}
- * to firstly send gmce to delete dataset logically from iceberg and then
- * call {@link org.apache.iceberg.Table#expireSnapshots()} to do physically data and metadata retention
+ * to firstly send gmce to delete dataset logically from iceberg and then process GMCEs within metadata-ingestion pipeline
+ * by calling {@link org.apache.iceberg.Table#expireSnapshots()} to materialize data/metadata retention
  */
 public class CleanableIcebergDataset<T extends FileSystemDatasetVersion> extends ConfigurableCleanableDataset<T> {
   private final static String RETENTION_INTERVAL_TIME = "retention.interval.time";
@@ -127,6 +127,10 @@ public class CleanableIcebergDataset<T extends FileSystemDatasetVersion> extends
     }
   }
 
+  /**
+   * Only in charge of filing {@link org.apache.gobblin.metadata.GobblinMetadataChangeEvent}
+   * The processing of these events can be seen in {@link org.apache.gobblin.iceberg.writer.IcebergMetadataWriter}.
+   */
   protected void cleanImpl(Collection<T> deletableVersions, Config retentionConfig) throws IOException {
     List<String> deletablePrefix = new ArrayList<>();
     for (T version : deletableVersions) {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
index 2351fd3..e6e4a72 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.joda.time.DateTime;
 import org.testng.Assert;
@@ -87,6 +89,42 @@ public class HiveSourceTest {
   }
 
   @Test
+  public void testAlreadyExistsPartition() throws Exception {
+    String dbName = "testdb";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+
+    this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true);
+
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1, tableSdLoc, Optional.of("field"));
+
+    this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), (int) System.currentTimeMillis());
+
+    try {
+      this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), (int) System.currentTimeMillis());
+    } catch (AlreadyExistsException e) {
+      return;
+    }
+    Assert.fail();
+  }
+
+  @Test
+  public void testPartitionNotExists() throws Exception {
+    String dbName = "testdb1";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+
+    this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true);
+
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1, tableSdLoc, Optional.of("field"));
+
+    try {
+      this.hiveMetastoreTestUtils.getLocalMetastoreClient().getPartition(tbl.getDbName(), tbl.getTableName(), "field");
+    } catch (NoSuchObjectException e) {
+      return;
+    }
+    Assert.fail();
+  }
+
+  @Test
   public void testGetWorkUnitsForPartitions() throws Exception {
     String dbName = "testdb3";
     String tableSdLoc = new File(this.tmpDir, TEST_TABLE_3).getAbsolutePath();
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
index 14011d2..d624f82 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
@@ -42,8 +42,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 
-import avro.shaded.com.google.common.base.Predicate;
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
 import lombok.Data;
 
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
index 653ddcb..9442cd9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
@@ -35,7 +35,7 @@ import org.apache.gobblin.util.AutoCloseableLock;
  *
  * <p>
  *   Obtaining a table lock does <em>not</em> lock the database, which permits concurrent operations on different
- *   tables in the same database. Similarly, obtianing a partition lock does not lock the table or the database.
+ *   tables in the same database. Similarly, obtaining a partition lock does not lock the table or the database.
  * </p>
  */
 public class HiveLock {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 1a8711e..d5b2393 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -586,7 +586,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
         }
         log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(nativePartition),
             table.getTableName(), nativePartition.getSd().getLocation()));
-      } catch (TException e) {
+      } catch (AlreadyExistsException e) {
         try {
           if (this.skipDiffComputation) {
             onPartitionExistWithoutComputingDiff(table, nativePartition, e);
@@ -632,7 +632,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
             onPartitionExist(client, table, partition, nativePartition, existedPartition);
           }
         }
-      } catch (TException e) {
+      } catch (NoSuchObjectException e) {
         try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
           client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
         }
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 054c8a9..6333688 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -36,8 +36,14 @@ public interface MetadataWriter extends Closeable {
   String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
   int DEFAULT_CACHE_EXPIRING_TIME = 1;
 
-  /*
-  Register the metadata of specific table to the metadata store
+  /**
+   * Register the metadata of specific table to the metadata store. This is a blocking method,
+   * meaning once it returns, as long as the underlying metadata storage is transactional (e.g. Mysql as for HMS),
+   * one could expect the metadata registration going through and being persisted already.
+   *
+   * @param dbName The db name of metadata-registration target.
+   * @param tableName The table name of metadata-registration target.
+   * @throws IOException
    */
   void flush(String dbName, String tableName) throws IOException;
 
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
index 53cec5f..65fb551 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
@@ -75,7 +75,7 @@ public class IcebergUtils {
   /**
    * Given a avro schema string and a hive table,
    * calculate the iceberg table schema and partition schema.
-   * (Since we use 'datepartition' as partition column, which is not included inside the data schema,
+   * (E.g. we use 'datepartition' as the partition column, which is not included inside the data schema,
    * we'll need to add that column to data schema to construct table schema
    */
   public static IcebergDataAndPartitionSchema getIcebergSchema(String schema,
@@ -212,15 +212,15 @@ public class IcebergUtils {
    * This method is mainly used to get the file to be deleted
    */
   public static DataFile getIcebergDataFileWithoutMetric(String file, PartitionSpec partitionSpec,
-      StructLike partition) {
+      StructLike partitionVal) {
     //Use raw Path to support federation.
     String rawPath = new Path(file).toUri().getRawPath();
     //Just want to remove the old files, so set the record number and file size to a random value
     DataFiles.Builder dataFileBuilder =
         DataFiles.builder(partitionSpec).withPath(rawPath).withFileSizeInBytes(0).withRecordCount(0);
 
-    if (partition != null) {
-      dataFileBuilder.withPartition(partition);
+    if (partitionVal != null) {
+      dataFileBuilder.withPartition(partitionVal);
     }
     return dataFileBuilder.build();
   }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index ebbf74b..a6036e3 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -266,6 +266,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
    * Call the metadata writers to do flush each table metadata.
    * Flush of metadata writer is the place that do real metadata
    * registrations (e.g. for iceberg, this method will generate a snapshot)
+   * Flush of metadata writer is the place that do really metadata
+   * registration (For iceberg, this method will generate a snapshot)
+   *
+   * Note that this is one of the place where the materialization of aggregated metadata happens.
+   * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
+   * and triggers materialization of metadata.
    * @throws IOException
    */
   @Override
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 59a8706..3abd491 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -104,7 +104,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.ClustersNames;
@@ -123,15 +123,16 @@ import org.apache.gobblin.util.WriterUtils;
 @Slf4j
 public class IcebergMetadataWriter implements MetadataWriter {
 
+  // Critical when there's dataset-level ACL enforced for both data and Iceberg metadata
   public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
   public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
   public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
   public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
   private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
   private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
-  public static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
-  public static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
-  public static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
+  private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
+  private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
+  private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
   private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
   private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
   private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
@@ -139,17 +140,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
   private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
   private static final String OFFSET_RANGE_KEY_FORMAT = OFFSET_RANGE_KEY_PREFIX + "%s";
-  private static final String ICEBERG_FILE_PATH_COLUMN = "file_path";
+
   private static final String DEFAULT_CREATION_TIME = "0";
   private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads";
   private static final long DEFAULT_WATERMARK = -1L;
+
+  /* one of the fields in DataFile entry to describe the location URI of a data file with FS Scheme */
+  private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
+
   protected final MetricContext metricContext;
   protected EventSubmitter eventSubmitter;
-  private final WhitelistBlacklist whiteistBlacklist;
+  private final WhitelistBlacklist whitelistBlacklist;
   private final Closer closer = Closer.create();
-  private final Map<TableIdentifier, Long> tableCurrentWaterMarkMap;
-  //Used to store the relationship between table and the gmce topicPartition
-  private final Map<TableIdentifier, String> tableTopicpartitionMap;
+
+  // Mapping between table-id and currently processed watermark
+  private final Map<TableIdentifier, Long> tableCurrentWatermarkMap;
+  // Used to store the relationship between table and the gmce topicPartition
+  private final Map<TableIdentifier, String> tableTopicPartitionMap;
   @Getter
   private final KafkaSchemaRegistry schemaRegistry;
   private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@@ -159,16 +166,16 @@ public class IcebergMetadataWriter implements MetadataWriter {
   protected final ReadWriteLock readWriteLock;
   private final HiveLock locks;
   private final ParallelRunner parallelRunner;
-  private final boolean useDataLoacationAsTableLocation;
+  private final boolean useDataLocationAsTableLocation;
   private FsPermission permission;
 
   public IcebergMetadataWriter(State state) throws IOException {
     this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
     conf = HadoopUtils.getConfFromState(state);
     initializeCatalog();
-    tableTopicpartitionMap = new HashMap<>();
+    tableTopicPartitionMap = new HashMap<>();
     tableMetadataMap = new HashMap<>();
-    tableCurrentWaterMarkMap = new HashMap<>();
+    tableCurrentWatermarkMap = new HashMap<>();
     List<Tag<?>> tags = Lists.newArrayList();
     String clusterIdentifier = ClustersNames.getInstance().getClusterName();
     tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
@@ -176,15 +183,17 @@ public class IcebergMetadataWriter implements MetadataWriter {
         GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags));
     this.eventSubmitter =
         new EventSubmitter.Builder(this.metricContext, IcebergMCEMetadataKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
-    this.whiteistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
+    this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
         state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
-    // Use lock to make it safe when flush and write are called async
+
+    // Use rw-lock to make it thread-safe when flush and write(which is essentially aggregate & reading metadata),
+    // are called in separate threads.
     readWriteLock = new ReentrantReadWriteLock();
     this.locks = new HiveLock(state.getProperties());
     parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20),
         FileSystem.get(HadoopUtils.getConfFromState(state))));
-    useDataLoacationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
-    if (useDataLoacationAsTableLocation) {
+    useDataLocationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
+    if (useDataLocationAsTableLocation) {
       permission =
           HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION,
               FsPermission.getDefault());
@@ -204,12 +213,14 @@ public class IcebergMetadataWriter implements MetadataWriter {
   }
 
   /**
-   *  The method is used to get current watermark of the gmce topic partition for a table
+   *  The method is used to get current watermark of the gmce topic partition for a table, and persist the value
+   *  in the {@link #tableMetadataMap} as a side effect.
+   *
    *  Make the watermark config name contains topicPartition in case we change the gmce topic name for some reason
    */
-  private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition) {
-    if (tableCurrentWaterMarkMap.containsKey(tid)) {
-      return tableCurrentWaterMarkMap.get(tid);
+  private Long getAndPersistCurrentWatermark(TableIdentifier tid, String topicPartition) {
+    if (tableCurrentWatermarkMap.containsKey(tid)) {
+      return tableCurrentWatermarkMap.get(tid);
     }
     org.apache.iceberg.Table icebergTable;
     Long currentWatermark = DEFAULT_WATERMARK;
@@ -229,13 +240,15 @@ public class IcebergMetadataWriter implements MetadataWriter {
   }
 
   /**
-   *The write method will be responsible for process a given gmce and aggregate the metadata
+   * The write method will be responsible for processing gmce and aggregating the metadata.
    * The logic of this function will be:
-   * 1. Check whether a table exists, if not then create the iceberg table
+   * 1. Check whether a table exists, if not then create a iceberg table
    * 2. Compute schema from the gmce and update the cache for candidate schemas
-   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile. change_property means only
-   * update the table level property but no data modification.
-   * Note: this method only aggregate the metadata in cache without committing. The actual commit will be done in flush method
+   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile, dropFile or change_property.
+   *
+   * Note: this method only aggregates the metadata in cache without committing,
+   * while the actual commit will be done in flush method (except rewrite and drop methods where preserving older file
+   * information increases the memory footprints, therefore we would like to flush them eagerly).
    */
   public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
       Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
@@ -261,7 +274,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
       }
     }
     computeCandidateSchema(gmce, tid, tableSpec);
-    tableMetadata.transaction = Optional.of(tableMetadata.transaction.or(table::newTransaction));
+    tableMetadata.ensureTxnInit();
     tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
     switch (gmce.getOperationType()) {
       case add_files: {
@@ -314,7 +327,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
     return offsets;
   }
 
-  private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) throws IOException {
+  /**
+   * The side effect of this method is to update the offset-range of the table identified by
+   * the given {@link TableIdentifier} with the input {@link GobblinMetadataChangeEvent}
+   */
+  private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) {
     TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
     tableMetadata.dataOffsetRange = Optional.of(tableMetadata.dataOffsetRange.or(() -> getLastOffset(tableMetadata)));
     Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
@@ -360,8 +377,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
    * @param gmce
    * @param tid
    */
-  private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec)
-      throws IOException {
+  private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec) {
     Table table = getIcebergTable(tid);
     TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
     org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreUtils.getTable(spec.getTable());
@@ -416,7 +432,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema, schemas.partitionSchema);
     Table icebergTable = null;
     String tableLocation = null;
-    if (useDataLoacationAsTableLocation) {
+    if (useDataLocationAsTableLocation) {
       tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
       //Set the path permission
       Path tablePath = new Path(tableLocation);
@@ -435,8 +451,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
   protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
       Map<String, Collection<HiveSpec>> oldSpecsMap, Table table, TableMetadata tableMetadata) throws IOException {
     PartitionSpec partitionSpec = table.spec();
-    tableMetadata.transaction = Optional.of(tableMetadata.transaction.or(table::newTransaction));
-    Transaction transaction = tableMetadata.transaction.get();
+    tableMetadata.ensureTxnInit();
     Set<DataFile> newDataFiles = new HashSet<>();
     getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
         .forEach(dataFile -> {
@@ -444,22 +459,26 @@ public class IcebergMetadataWriter implements MetadataWriter {
           tableMetadata.addedFiles.put(dataFile.path(), "");
         });
     Set<DataFile> oldDataFiles = getIcebergDataFilesToBeDeleted(gmce, table, newSpecsMap, oldSpecsMap, partitionSpec);
+
+    // Dealing with the case when old file doesn't exist, in which it could either be converted into noop or AppendFile.
     if (oldDataFiles.isEmpty() && !newDataFiles.isEmpty()) {
-      //We randomly check whether one of the new data files already exists in the db to avoid duplication of re-write events
-      DataFile file = newDataFiles.iterator().next();
-      Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, (String) file.path());
+      //We randomly check whether one of the new data files already exists in the db to avoid reprocessing re-write events
+      DataFile dataFile = newDataFiles.iterator().next();
+      Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, dataFile.path().toString());
       if (FindFiles.in(table).withMetadataMatching(exp).collect().iterator().hasNext()) {
-        //This means this re-write event is duplicated with the one we already handled, so directly return
+        //This means this re-write event is duplicated with the one we already handled, so noop.
         return;
       }
-      //This is the case when the files to be deleted do not exist in table
-      //So we directly call addFiles interface to add new files into the table.
-      tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend));
-      AppendFiles appendFiles = tableMetadata.appendFiles.get();
+      // This is the case when the files to be deleted do not exist in table
+      // So we directly call addFiles interface to add new files into the table.
+      // Note that this AppendFiles won't be committed here, in contrast to a real rewrite operation
+      // where the commit will be called at once to save memory footprints.
+      AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
       newDataFiles.forEach(appendFiles::appendFile);
       return;
     }
-    transaction.newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
+
+    tableMetadata.transaction.get().newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
   }
 
   /**
@@ -478,15 +497,22 @@ public class IcebergMetadataWriter implements MetadataWriter {
     return schemaWithOriginId;
   }
 
+  /**
+   * Deal with both regular file deletions manifested by GMCE(aggregation but no commit),
+   * and expiring older snapshots(commit).
+   */
   protected void dropFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> oldSpecsMap, Table table,
       TableMetadata tableMetadata, TableIdentifier tid) throws IOException {
     PartitionSpec partitionSpec = table.spec();
-    Transaction transaction = tableMetadata.transaction.get();
+
+    // Update DeleteFiles in tableMetadata: This is regular file deletion
+    DeleteFiles deleteFiles = tableMetadata.getOrInitDeleteFiles();
     Set<DataFile> oldDataFiles =
         getIcebergDataFilesToBeDeleted(gmce, table, new HashMap<>(), oldSpecsMap, partitionSpec);
-    tableMetadata.deleteFiles = Optional.of(tableMetadata.deleteFiles.or(transaction::newDelete));
-    DeleteFiles deleteFiles = tableMetadata.deleteFiles.get();
-    oldDataFiles.stream().forEach(deleteFiles::deleteFile);
+    oldDataFiles.forEach(deleteFiles::deleteFile);
+
+    // Update ExpireSnapshots and commit the updates at once: This is for expiring snapshots that are
+    // beyond look-back allowance for time-travel.
     parallelRunner.submitCallable(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
@@ -495,6 +521,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
           long start = System.currentTimeMillis();
           ExpireSnapshots expireSnapshots = table.expireSnapshots();
           final Table tmpTable = table;
+
           expireSnapshots.deleteWith(new Consumer<String>() {
             @Override
             public void accept(String file) {
@@ -533,10 +560,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
   }
 
   protected void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Table table,
-      TableMetadata tableMetadata) throws IOException {
-    Transaction transaction = tableMetadata.transaction.get();
-    tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend));
-    AppendFiles appendFiles = tableMetadata.appendFiles.get();
+      TableMetadata tableMetadata) {
+    AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
     getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
         .forEach(dataFile -> {
           appendFiles.appendFile(dataFile);
@@ -544,18 +569,19 @@ public class IcebergMetadataWriter implements MetadataWriter {
         });
   }
 
-  private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table, Map<String, Collection<HiveSpec>> newSpecsMap,
-      TableMetadata tableMetadata) throws IOException {
+  private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
+      Map<String, Collection<HiveSpec>> newSpecsMap,
+      TableMetadata tableMetadata) {
     return getIcebergDataFilesToBeAdded(gmce.getNewFiles(), table.spec(), newSpecsMap,
         IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream()
         .filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null);
   }
 
   /**
-   * Method to get dataFiles without metrics information
+   * Method to get a {@link DataFile} collection without metrics information
    * This method is used to get files to be deleted from iceberg
    * If oldFilePrefixes is specified in gmce, this method will use those prefixes to find old file in iceberg,
-   * or the method will callmethod {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
+   * or the method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
    */
   private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gmce, Table table,
       Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap,
@@ -564,7 +590,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     if (gmce.getOldFilePrefixes() != null) {
       Expression exp = Expressions.alwaysFalse();
       for (String prefix : gmce.getOldFilePrefixes()) {
-        //Use both full path and raw path to delete old files
+        // Use both full path and raw path to filter old files
         exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, prefix));
         String rawPathPrefix = new Path(prefix).toUri().getRawPath();
         exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, rawPathPrefix));
@@ -572,16 +598,16 @@ public class IcebergMetadataWriter implements MetadataWriter {
       long start = System.currentTimeMillis();
       oldDataFiles.addAll(Sets.newHashSet(FindFiles.in(table).withMetadataMatching(exp).collect().iterator()));
       //Use INFO level log here to get better estimate.
-      //This shouldn't overwhelm the log since we receive less than 3 rewrite_file gmces for one table in one day
+      //This shouldn't overwhelm the log since we receive limited number of rewrite_file gmces for one table in a day
       log.info("Spent {}ms to query all old files in iceberg.", System.currentTimeMillis() - start);
     } else {
       for (String file : gmce.getOldFiles()) {
         String specPath = new Path(file).getParent().toString();
         // For the use case of recompaction, the old path may contains /daily path, in this case, we find the spec from newSpecsMap
-        StructLike partition = getIcebergPartition(
+        StructLike partitionVal = getIcebergPartitionVal(
             oldSpecsMap.containsKey(specPath) ? oldSpecsMap.get(specPath) : newSpecsMap.get(specPath), file,
             partitionSpec);
-        oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partition));
+        oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partitionVal));
       }
     }
     return oldDataFiles;
@@ -593,12 +619,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
    */
   private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
-      PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap)
-      throws IOException {
+      PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartition(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
+        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
             file.getFilePath(), partitionSpec);
         dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, conf, schemaIdMap));
       } catch (Exception e) {
@@ -608,15 +633,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
     return dataFiles;
   }
 
-  private StructLike getIcebergPartition(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec)
+  /**
+   * Obtain Iceberg partition value with a collection of {@link HiveSpec}.
+   * @param specs A collection of {@link HiveSpec}s.
+   * @param filePath URI of file, used for logging purpose in this method.
+   * @param partitionSpec The scheme of partition.
+   * @return The value of partition based on the given {@link PartitionSpec}.
+   * @throws IOException
+   */
+  private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec)
       throws IOException {
     if (specs == null || specs.isEmpty()) {
       throw new IOException("Cannot get hive spec for " + filePath);
     }
     HivePartition hivePartition = specs.iterator().next().getPartition().orNull();
-    StructLike partition = hivePartition == null ? null
+    StructLike partitionVal = hivePartition == null ? null
         : IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues());
-    return partition;
+    return partitionVal;
   }
 
   /**
@@ -647,10 +680,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
         Map<String, String> props = tableMetadata.newProperties.or(
             Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
         //Set high waterMark
-        Long highWatermark = tableCurrentWaterMarkMap.get(tid);
-        props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicpartitionMap.get(tid)), highWatermark.toString());
+        Long highWatermark = tableCurrentWatermarkMap.get(tid);
+        props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
         //Set low waterMark
-        props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicpartitionMap.get(tid)),
+        props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
             tableMetadata.lowWatermark.get().toString());
         //Set whether to delete metadata files after commit
         props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
@@ -667,8 +700,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
           //In case the topic name is not the table name or the topic name contains '-'
           topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
         }
-        //Update schema
+        //Update schema(commit)
         updateSchema(tableMetadata, props, topicName);
+
         //Update properties
         UpdateProperties updateProperties = transaction.updateProperties();
         props.forEach(updateProperties::set);
@@ -676,12 +710,17 @@ public class IcebergMetadataWriter implements MetadataWriter {
         try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
           transaction.commitTransaction();
         }
+
+        // Emit GTE for snapshot commits
         Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
         Map<String, String> currentProps = tableMetadata.table.get().properties();
         submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
+
         //Reset the table metadata for next accumulation period
         tableMetadata.reset(currentProps, highWatermark);
-        log.info(String.format("Finish flushing for table %s", tid.toString()));
+        log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
+      } else {
+        log.info("There's no transaction initiated for the table {}", tid.toString());
       }
     } catch (RuntimeException e) {
       throw new RuntimeException(String.format("Fail to flush table %s %s", dbName, tableName), e);
@@ -699,7 +738,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     long currentSnapshotID = snapshot.snapshotId();
     long endToEndLag = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime;
     TableIdentifier tid = TableIdentifier.of(dbName, tableName);
-    String gmceTopicPartition = tableTopicpartitionMap.get(tid);
+    String gmceTopicPartition = tableTopicPartitionMap.get(tid);
 
     //Add information to automatically trigger repair jon when data loss happen
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, gmceTopicPartition.split("-")[0]);
@@ -787,28 +826,27 @@ public class IcebergMetadataWriter implements MetadataWriter {
       GenericRecord genericRecord = recordEnvelope.getRecord();
       GobblinMetadataChangeEvent gmce =
           (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
-      if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+      if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
         TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
-        String topicPartition = tableTopicpartitionMap.computeIfAbsent(tid,
-            t -> ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getTopicPartition().toString());
-        Long currentWatermark = getCurrentWaterMark(tid, topicPartition);
-        Long currentOffset =
-            ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue();
+        String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
+            t -> ((KafkaWatermark) recordEnvelope.getWatermark()).getTopicPartition().toString());
+        Long currentWatermark = getAndPersistCurrentWatermark(tid, topicPartition);
+        Long currentOffset = ((KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue();
+
         if (currentOffset > currentWatermark) {
           if (currentWatermark == DEFAULT_WATERMARK) {
             //This means we haven't register this table or the GMCE topic partition changed, we need to reset the low watermark
             tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).lowWatermark =
                 Optional.of(currentOffset - 1);
           }
-          tableMetadataMap.get(tid).datasetName = gmce.getDatasetIdentifier().getNativeName();
+          tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
           write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
-          tableCurrentWaterMarkMap.put(tid,
-              ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue());
+          tableCurrentWatermarkMap.put(tid, currentOffset);
         } else {
           log.warn(String.format("Skip processing record %s since it has lower watermark", genericRecord.toString()));
         }
       } else {
-        log.info(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+        log.info(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
             tableSpec.getTable().getTableName()));
       }
     } finally {
@@ -821,32 +859,86 @@ public class IcebergMetadataWriter implements MetadataWriter {
     this.closer.close();
   }
 
+  /**
+   * A collection of Iceberg metadata including {@link Table} itself as well as
+   * A set of buffered objects (reflecting table's {@link org.apache.iceberg.PendingUpdate}s) within the flush interval
+   * that aggregates the metadata like location arriving / deleting files, schema,
+   * along with other table-level metadata like watermark, offset-range, etc.
+   *
+   * Also note the difference with {@link org.apache.iceberg.TableMetadata}.
+   */
   private class TableMetadata {
-    Optional<Transaction> transaction = Optional.absent();
     Optional<Table> table = Optional.absent();
-    Optional<AppendFiles> appendFiles = Optional.absent();
-    Optional<DeleteFiles> deleteFiles = Optional.absent();
+
+    /**
+     * The {@link Transaction} object holds the reference of a {@link org.apache.iceberg.BaseTransaction.TransactionTableOperations}
+     * that is shared by all individual operation (e.g. {@link AppendFiles}) to ensure atomicity even if commit method
+     * is invoked from a individual operation.
+     */
+    Optional<Transaction> transaction = Optional.absent();
+    private Optional<AppendFiles> appendFiles = Optional.absent();
+    private Optional<DeleteFiles> deleteFiles = Optional.absent();
+
     Optional<Map<String, String>> lastProperties = Optional.absent();
     Optional<Map<String, String>> newProperties = Optional.absent();
     Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
     Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
     Optional<String> lastSchemaVersion = Optional.absent();
     Optional<Long> lowWatermark = Optional.absent();
+
+    @Setter
     String datasetName;
+
     Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
         .expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
             TimeUnit.HOURS)
         .build();
     long lowestGMCEEmittedTime = Long.MAX_VALUE;
 
-    public void reset(Map<String, String> props, Long lowWaterMark) {
+    /**
+     * Always use this method to obtain {@link AppendFiles} object within flush interval
+     * if clients want to have the {@link AppendFiles} committed along with other updates in a txn.
+     */
+    AppendFiles getOrInitAppendFiles() {
+      ensureTxnInit();
+      if (!this.appendFiles.isPresent()) {
+        this.appendFiles = Optional.of(this.transaction.get().newAppend());
+      }
+
+      return this.appendFiles.get();
+    }
+
+    DeleteFiles getOrInitDeleteFiles() {
+      ensureTxnInit();
+      if (!this.deleteFiles.isPresent()) {
+        this.deleteFiles = Optional.of(this.transaction.get().newDelete());
+      }
+
+      return this.deleteFiles.get();
+    }
+
+    /**
+     * Initializing {@link Transaction} object within {@link TableMetadata} when needed.
+     */
+    void ensureTxnInit() {
+      if (!this.transaction.isPresent()) {
+        this.transaction = Optional.of(table.get().newTransaction());
+      }
+    }
+
+    void reset(Map<String, String> props, Long lowWaterMark) {
       this.lastProperties = Optional.of(props);
       this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
-      //clear cache
       this.transaction = Optional.absent();
       this.deleteFiles = Optional.absent();
       this.appendFiles = Optional.absent();
+
+      // Clean cache and reset to eagerly release unreferenced objects.
+      if (this.candidateSchemas.isPresent()) {
+        this.candidateSchemas.get().cleanUp();
+      }
       this.candidateSchemas = Optional.absent();
+
       this.dataOffsetRange = Optional.absent();
       this.newProperties = Optional.absent();
       this.lowestGMCEEmittedTime = Long.MAX_VALUE;
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 066bcc28..2ae9859 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -154,13 +154,17 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
   @Test ( priority = 0 )
   public void testWriteAddFileGMCE() throws IOException {
-    gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(gmce,
+    // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
+    // without risking running into type cast runtime error.
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+
+    gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(10L))));
     //Test when accept clusters does not contain the gmce cluster, we will skip
     Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 0);
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(10L))));
@@ -169,8 +173,11 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
     Assert.assertEquals(table.location(),
         new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    GenericRecord genericGmce_1000_2000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce_1000_2000,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(20L))));
@@ -189,7 +196,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         .setFileFormat("avro")
         .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
         .build()));
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    GenericRecord genericGmce_2000_3000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce_2000_3000,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(30L))));
@@ -202,7 +210,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
     /* Test it will skip event with lower watermark*/
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "3000-4000").build());
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(30L))));
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
index 389e6ab..07f0172 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.annotations.Test;
 
-import avro.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 0104f43..5278092 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -64,6 +64,8 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
       throws IOException {
     super(jobId, LOGGER);
 
+    addRuntimeProperties(gobblinProps);
+
     Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
 
     //Suppress logs from classes that emit Yarn application Id that Azkaban uses to kill the application.
@@ -98,6 +100,12 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
     return new YarnConfiguration();
   }
 
+  /**
+   * Extended class can override this method to add some runtime properties.
+   */
+  protected void addRuntimeProperties(Properties gobblinProps) {
+  }
+
   @Override
   public void run() throws Exception {
     this.gobblinYarnAppLauncher.launch();
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
index 2e1b113..a517f12 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
+import org.apache.gobblin.util.AvroSchemaUtils;
 import org.apache.orc.TypeDescription;
 
 
@@ -102,8 +103,8 @@ public class AvroOrcSchemaConverter {
    */
   private static TypeDescription getTypeDescriptionForBinarySchema(Schema avroSchema) {
     if ("decimal".equalsIgnoreCase(avroSchema.getProp("logicalType"))) {
-      int scale = avroSchema.getJsonProp("scale").asInt(0);
-      int precision = avroSchema.getJsonProp("precision").asInt();
+      int scale = AvroSchemaUtils.getValueAsInteger(avroSchema, "scale");
+      int precision = AvroSchemaUtils.getValueAsInteger(avroSchema, "precision");
 
       return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
     }
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
index 557c337..1410f00 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
@@ -31,7 +31,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
 import com.google.common.base.Splitter;
 import com.google.gson.JsonArray;
 
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
index 042bbae..df3e479 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
@@ -41,7 +41,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
 import com.google.api.services.webmasters.model.SearchAnalyticsQueryResponse;
 import com.google.common.base.Optional;
 
-import avro.shaded.com.google.common.base.Joiner;
+import com.google.common.base.Joiner;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.WorkUnitState;
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
index 135f462..10be171 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class ThrottlingClientTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
index b0e6812..92e3336 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
@@ -29,7 +29,7 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class ConfigStoreBasedPolicyTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
index bc538ec..15fcae4 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class LimiterServerResourceTest {
diff --git a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
index 0a567a1..bf29d05 100644
--- a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
+++ b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
 import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Unit tests for {@link HadoopKerberosKeytabAuthenticationPlugin}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
index 8ad9093..a56410e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
@@ -36,7 +36,7 @@ import org.apache.gobblin.runtime.api.GobblinInstancePluginFactory;
 import org.apache.gobblin.runtime.plugins.email.EmailNotificationPlugin;
 import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
index ce9aa50..a85c85e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
@@ -23,7 +23,7 @@ import org.testng.annotations.Test;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Unit tests for {@link HadoopConfigLoader}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9792b83..399bfb8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 771026c..7a15357 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -26,7 +26,6 @@ import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonNode;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -417,7 +416,8 @@ public class AvroFlattener {
         if (StringUtils.isNotBlank(flattenSource)) {
           field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
         }
-        for (Map.Entry<String, JsonNode> entry : f.getJsonProps().entrySet()) {
+        // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+        for (Map.Entry<String, Object> entry : f.getObjectProps().entrySet()) {
           field.addProp(entry.getKey(), entry.getValue());
         }
         flattenedFields.add(field);
@@ -466,8 +466,8 @@ public class AvroFlattener {
   private static void copyProperties(Schema oldSchema, Schema newSchema) {
     Preconditions.checkNotNull(oldSchema);
     Preconditions.checkNotNull(newSchema);
-
-    Map<String, JsonNode> props = oldSchema.getJsonProps();
+    // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+    Map<String, Object> props = oldSchema.getObjectProps();
     copyProperties(props, newSchema);
   }
 
@@ -476,12 +476,12 @@ public class AvroFlattener {
    * @param props Properties to copy to Avro Schema
    * @param schema Avro Schema to copy properties to
    */
-  private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
+  private static void copyProperties(Map<String, Object> props, Schema schema) {
     Preconditions.checkNotNull(schema);
 
     // (if null, don't copy but do not throw exception)
     if (null != props) {
-      for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+      for (Map.Entry<String, Object> prop : props.entrySet()) {
         schema.addProp(prop.getKey(), prop.getValue());
       }
     }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
new file mode 100644
index 0000000..e1ad552
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.apache.avro.Schema;
+
+
+/**
+ * Avro schema utility class to perform schema property conversion to the appropriate data types
+ */
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {
+
+  }
+
+  public static Integer getValueAsInteger(final Schema schema, String prop) {
+    Object value = schema.getObjectProp(prop);
+    if (value instanceof Integer) {
+      return (Integer) value;
+    } else if (value instanceof String) {
+      return Integer.parseInt((String) value);
+    }
+    return null;
+  }
+
+  public static Integer getValueAsInteger(final Schema.Field field, String prop) {
+    Object value = field.getObjectProp(prop);
+    if (value instanceof Integer) {
+      return (Integer) value;
+    } else if (value instanceof String) {
+      return Integer.parseInt((String) value);
+    }
+    return null;
+  }
+
+  public static String getValueAsString(final Schema schema, String prop) {
+    Object value = schema.getObjectProp(prop);
+    if (value instanceof String) {
+      return (String) value;
+    }
+    return null;
+  }
+
+  public static String getValueAsString(final Schema.Field field, String prop) {
+    Object value = field.getObjectProp(prop);
+    if (value instanceof String) {
+      return (String) value;
+    }
+    return null;
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 3d2587f..d976ea7 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.codehaus.jackson.JsonNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,9 +146,10 @@ public class AvroUtils {
    * Common use cases for this method is in traversing {@link Schema} object into nested level and create {@link Schema}
    * object for non-root level.
    */
-  public static void convertFieldToSchemaWithProps(Map<String,JsonNode> fieldProps, Schema targetSchemaObj) {
-    for (Map.Entry<String, JsonNode> stringJsonNodeEntry : fieldProps.entrySet()) {
-      targetSchemaObj.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+  public static void convertFieldToSchemaWithProps(Map<String,Object> fieldProps,
+      Schema targetSchemaObj) {
+    for (Map.Entry<String, Object> objectEntry : fieldProps.entrySet()) {
+      targetSchemaObj.addProp(objectEntry.getKey(), objectEntry.getValue());
     }
   }
 
@@ -830,8 +830,8 @@ public class AvroUtils {
   private static void copyProperties(Schema oldSchema, Schema newSchema) {
     Preconditions.checkNotNull(oldSchema);
     Preconditions.checkNotNull(newSchema);
-
-    Map<String, JsonNode> props = oldSchema.getJsonProps();
+    // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+    Map<String, Object> props = oldSchema.getObjectProps();
     copyProperties(props, newSchema);
   }
 
@@ -840,12 +840,12 @@ public class AvroUtils {
    * @param props Properties to copy to Avro Schema
    * @param schema Avro Schema to copy properties to
    */
-  private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
+  private static void copyProperties(Map<String, Object> props, Schema schema) {
     Preconditions.checkNotNull(schema);
 
     // (if null, don't copy but do not throw exception)
     if (null != props) {
-      for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+      for (Map.Entry<String, Object> prop : props.entrySet()) {
         schema.addProp(prop.getKey(), prop.getValue());
       }
     }
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 11274af..6f18a35 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -45,6 +45,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.internal.JacksonUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -620,10 +621,13 @@ public class AvroUtilsTest {
           .getResourceAsStream("recursive_schemas/recursive_" + scenario + "_solution.avsc"));
 
       // get the answer from the input schema (test author needs to provide this)
-      ArrayNode foo = (ArrayNode) inputSchema.getJsonProp("recursive_fields");
+      // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+      // Use internal JacksonUtils to convert object to the corresponding JsonNode (ArrayNode)
+      ArrayNode foo = (ArrayNode) JacksonUtils.toJsonNode(inputSchema.getObjectProp(
+          "recursive_fields"));
       HashSet<String> answers = new HashSet<>();
       for (JsonNode fieldsWithRecursion: foo) {
-        answers.add(fieldsWithRecursion.getTextValue());
+        answers.add(fieldsWithRecursion.asText());
       }
 
       Pair<Schema, List<AvroUtils.SchemaEntry>> results = AvroUtils.dropRecursiveFields(inputSchema);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01cb964..c0cde51 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -45,8 +46,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -191,7 +195,6 @@ public class GobblinYarnAppLauncher {
   private final HelixManager helixManager;
 
   private final Configuration yarnConfiguration;
-  private final YarnClient yarnClient;
   private final FileSystem fs;
 
   private final EventBus eventBus = new EventBus(GobblinYarnAppLauncher.class.getSimpleName());
@@ -235,6 +238,9 @@ public class GobblinYarnAppLauncher {
 
   private final String containerTimezone;
   private final String appLauncherMode;
+  private final String originalYarnRMAddress;
+  private final Map<String, YarnClient> potentialYarnClients;
+  private YarnClient yarnClient;
 
   public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
     this.config = config;
@@ -253,8 +259,16 @@ public class GobblinYarnAppLauncher {
     YarnHelixUtils.setYarnClassPath(config, this.yarnConfiguration);
     YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
     this.yarnConfiguration.set("fs.automatic.close", "false");
-    this.yarnClient = YarnClient.createYarnClient();
-    this.yarnClient.init(this.yarnConfiguration);
+    this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
+    this.potentialYarnClients = new HashMap();
+    Set<String> potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
+    potentialRMAddresses.add(originalYarnRMAddress);
+    for (String rmAddress : potentialRMAddresses) {
+      YarnClient tmpYarnClient = YarnClient.createYarnClient();
+      this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
+      tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
+      potentialYarnClients.put(rmAddress, tmpYarnClient);
+    }
 
     this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
     this.closer.register(this.fs);
@@ -340,20 +354,20 @@ public class GobblinYarnAppLauncher {
 
     connectHelixManager();
 
-    startYarnClient();
-
-    Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
-
-    boolean isReconnected = reconnectableApplicationId.isPresent();
-    // Before setup application, first login to make sure ugi has the right token.
+    //Before connect with yarn client, we need to login to get the token
     if(ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
-      this.securityManager = Optional.of(buildSecurityManager(isReconnected));
+      this.securityManager = Optional.of(buildSecurityManager());
       this.securityManager.get().loginAndScheduleTokenRenewal();
     }
 
-    if (!reconnectableApplicationId.isPresent()) {
+    startYarnClient();
+
+    this.applicationId = getReconnectableApplicationId();
+
+    if (!this.applicationId.isPresent()) {
       disableLiveHelixInstances();
       LOGGER.info("No reconnectable application found so submitting a new application");
+      this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
       this.applicationId = Optional.of(setupAndSubmitApplication());
     }
 
@@ -549,12 +563,16 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   void startYarnClient() {
-    this.yarnClient.start();
+    for (YarnClient yarnClient : potentialYarnClients.values()) {
+      yarnClient.start();
+    }
   }
 
   @VisibleForTesting
   void stopYarnClient() {
-    this.yarnClient.stop();
+    for (YarnClient yarnClient : potentialYarnClients.values()) {
+      yarnClient.stop();
+    }
   }
 
   /**
@@ -574,19 +592,21 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
-    List<ApplicationReport> applicationReports =
-        this.yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
-    if (applicationReports == null || applicationReports.isEmpty()) {
-      return Optional.absent();
-    }
+    for (YarnClient yarnClient: potentialYarnClients.values()) {
+      List<ApplicationReport> applicationReports = yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
+      if (applicationReports == null || applicationReports.isEmpty()) {
+        continue;
+      }
 
-    // Try to find an application with a matching application name
-    for (ApplicationReport applicationReport : applicationReports) {
-      if (this.applicationName.equals(applicationReport.getName())) {
-        String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
-        LOGGER.info("Found reconnectable application with application ID: " + applicationId);
-        LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
-        return Optional.of(applicationReport.getApplicationId());
+      // Try to find an application with a matching application name
+      for (ApplicationReport applicationReport : applicationReports) {
+        if (this.applicationName.equals(applicationReport.getName())) {
+          String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
+          LOGGER.info("Found reconnectable application with application ID: " + applicationId);
+          LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
+          this.yarnClient = yarnClient;
+          return Optional.of(applicationReport.getApplicationId());
+        }
       }
     }
 
@@ -835,14 +855,22 @@ public class GobblinYarnAppLauncher {
 
     TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
         Optional.absent(), ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
-
+    // Only pass token here and no secrets. (since there is no simple way to remove single token/ get secrets)
+    // For RM token, only pass the RM token for the current RM, or the RM will fail to update the token
+    Credentials finalCredentials = new Credentials();
+    for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+      if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && !token.getService().equals(new Text(this.originalYarnRMAddress))) {
+        continue;
+      }
+      finalCredentials.addToken(token.getService(), token);
+    }
     Closer closer = Closer.create();
     try {
       DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
-      credentials.writeTokenStorageToStream(dataOutputBuffer);
+      finalCredentials.writeTokenStorageToStream(dataOutputBuffer);
       ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
       containerLaunchContext.setTokens(fsTokens);
-      LOGGER.info("Setting containerLaunchContext with All credential tokens: " + credentials.getAllTokens());
+      LOGGER.info("Setting containerLaunchContext with All credential tokens: " + finalCredentials.getAllTokens());
     } catch (Throwable t) {
       throw closer.rethrow(t);
     } finally {
@@ -872,7 +900,7 @@ public class GobblinYarnAppLauncher {
     return logRootDir;
   }
 
-  private AbstractYarnAppSecurityManager buildSecurityManager(boolean isReconnected) throws IOException {
+  private AbstractYarnAppSecurityManager buildSecurityManager() throws IOException {
     Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
         GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
 
@@ -881,7 +909,7 @@ public class GobblinYarnAppLauncher {
     try {
      return (AbstractYarnAppSecurityManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
           ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, this.helixManager, this.fs,
-          tokenFilePath, isReconnected);
+          tokenFilePath);
     } catch (ReflectiveOperationException e) {
       throw new IOException(e);
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 956848e..fcc30d3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -37,6 +37,8 @@ public class GobblinYarnConfigurationKeys {
   public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
   public static final String APP_VIEW_ACL = GOBBLIN_YARN_PREFIX + "appViewAcl";
   public static final String DEFAULT_APP_VIEW_ACL = "*";
+  public static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+  public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= "other.yarn.resourcemanager.addresses";
 
   // Gobblin Yarn ApplicationMaster configuration properties.
   public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + "app.master.memory.mbs";