You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/18 23:29:59 UTC

[gobblin] branch master updated: [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (#3581)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eaa6e87f [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (#3581)
7eaa6e87f is described below

commit 7eaa6e87ff091273311dcd4490d68dd96a61ae5c
Author: Abhishek Nath <an...@linkedin.com>
AuthorDate: Tue Oct 18 16:29:53 2022 -0700

    [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (#3581)
    
    * [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
    
    * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs.
    
    * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries
    
    * Applied Gobblin codestyle formatting.
    
    Co-authored-by: Lei <au...@users.noreply.github.com>
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (#3368)
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency.
    
    * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
    
    * Use helper-all v0.2.74 to solve issues around default values. (#3469)
    
    The latest version of helper-all fixes the issues seen before w.r.t.
    default values, so we can now revert the code and the *.avsc files back
    to how they used to be, with two minor exceptions:
    
    1. Check Schema equality using their .toString() representations. Doing
       it the old way works for two out of the three instances, but one of
       them fails, for reasons I haven't figured out yet.
    
    2. Add a `"default":null` piece to recursive_schema_1_converted.avsc.
       This is harmless, and is caused by the fact that the compatibility
       helper always adds it if it's a valid default for the schema. See
       the comments for FieldBuilder19.setDefault():
       https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69
    
    To verify that the files are otherwise the same as before:
    ```
    $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do
    > git show 928e0180c471fc4b7a6caee041b001b5b34e1cc6:$file > /tmp/before
    > diff <(jq . </tmp/before) <(jq . <$file)
    > done
    ```
    
    * [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
    
    * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs.
    
    * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries
    
    * Applied Gobblin codestyle formatting.
    
    Co-authored-by: Lei <au...@users.noreply.github.com>
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (#3368)
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency.
    
    * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
    
    * Use helper-all v0.2.74 to solve issues around default values. (#3469)
    
    The latest version of helper-all fixes the issues seen before w.r.t.
    default values, so we can now revert the code and the *.avsc files back
    to how they used to be, with two minor exceptions:
    
    1. Check Schema equality using their .toString() representations. Doing
       it the old way works for two out of the three instances, but one of
       them fails, for reasons I haven't figured out yet.
    
    2. Add a `"default":null` piece to recursive_schema_1_converted.avsc.
       This is harmless, and is caused by the fact that the compatibility
       helper always adds it if it's a valid default for the schema. See
       the comments for FieldBuilder19.setDefault():
       https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69
    
    To verify that the files are otherwise the same as before:
    ```
    $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do
    > git show 928e0180c471fc4b7a6caee041b001b5b34e1cc6:$file > /tmp/before
    > diff <(jq . </tmp/before) <(jq . <$file)
    > done
    ```
    
    * [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
    
    * Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs.
    
    * Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries
    
    * Applied Gobblin codestyle formatting.
    
    Co-authored-by: Lei <au...@users.noreply.github.com>
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency. (#3368)
    
    * Avro 1.9 upgrade compatible change - upgraded jackson mapper to 2.x and kept jackson mapper 1.x for modules with hive, helix library dependency.
    
    * Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
    
    * Use helper-all v0.2.74 to solve issues around default values. (#3469)
    
    The latest version of helper-all fixes the issues seen before w.r.t.
    default values, so we can now revert the code and the *.avsc files back
    to how they used to be, with two minor exceptions:
    
    1. Check Schema equality using their .toString() representations. Doing
       it the old way works for two out of the three instances, but one of
       them fails, for reasons I haven't figured out yet.
    
    2. Add a `"default":null` piece to recursive_schema_1_converted.avsc.
       This is harmless, and is caused by the fact that the compatibility
       helper always adds it if it's a valid default for the schema. See
       the comments for FieldBuilder19.setDefault():
       https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69
    
    To verify that the files are otherwise the same as before:
    ```
    $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do
    > git show 928e0180c471fc4b7a6caee041b001b5b34e1cc6:$file > /tmp/before
    > diff <(jq . </tmp/before) <(jq . <$file)
    > done
    ```
    
    * Merging apache/gobblin master with avro_1_9
    
    * Use helper-all v0.2.74 to solve issues around default values. (#3469)
    
    The latest version of helper-all fixes the issues seen before w.r.t.
    default values, so we can now revert the code and the *.avsc files back
    to how they used to be, with two minor exceptions:
    
    1. Check Schema equality using their .toString() representations. Doing
       it the old way works for two out of the three instances, but one of
       them fails, for reasons I haven't figured out yet.
    
    2. Add a `"default":null` piece to recursive_schema_1_converted.avsc.
       This is harmless, and is caused by the fact that the compatibility
       helper always adds it if it's a valid default for the schema. See
       the comments for FieldBuilder19.setDefault():
       https://github.com/linkedin/avro-util/blob/b9e89c55980ea8e5fd3c8d8da362d7195dd2a99c/helper/impls/helper-impl-19/src/main/java/com/linkedin/avroutil1/compatibility/avro19/FieldBuilder19.java#L69
    
    To verify that the files are otherwise the same as before:
    ```
    $ for file in gobblin-core-base/src/test/resources/converter/*.avsc; do
    > git show 928e0180c471fc4b7a6caee041b001b5b34e1cc6:$file > /tmp/before
    > diff <(jq . </tmp/before) <(jq . <$file)
    > done
    ```
    
    * Added deprecated json method using AvroCompatibilityHelper
    
    * Removed unused import and replaced Integer.valueOf with Integer.parseInt
    
    * Exclude com.linkedin.hive dependency from gradle build files similar to org.apache.hive
    
    * Repalce direct avro field creation with AvroCompatibilityHelper.createSchemaField
    
    * Removed extra dependency. Addressed review comment - removed jcenter() repository
    
    * Upgrade AvroCompatHelper version
    
    * Removed the code that are actually moved to AvroHiveTypeUtils.java in the master branch
    
    * Addresssed review comments: replaced getObjectProps/getObjectProp with AvroCompatibilityHelper methods
    
    * Fix for test failure
    
    Co-authored-by: Lei <au...@users.noreply.github.com>
    Co-authored-by: Sreeram Ramachandran <sr...@linkedin.com>
---
 gobblin-cluster/build.gradle                       |   1 +
 gobblin-compaction/build.gradle                    |   1 +
 .../FieldAttributeBasedDeltaFieldsProvider.java    |   9 +-
 .../avro/MRCompactorAvroKeyDedupJobRunner.java     |   4 +-
 ...FieldAttributeBasedDeltaFieldsProviderTest.java |   2 +-
 gobblin-core-base/build.gradle                     |   1 +
 .../converter/filter/AvroSchemaFieldRemover.java   |  22 +--
 ...GobblinTrackingEventFlattenFilterConverter.java |  10 +-
 .../apache/gobblin/test/SequentialTestSource.java  |   2 +-
 .../filter/AvroSchemaFieldRemoverTest.java         |   6 +-
 ...linTrackingEventFlattenFilterConverterTest.java |  10 +-
 .../converter/recursive_schema_1_converted.avsc    |   2 +-
 gobblin-core/build.gradle                          |   1 +
 .../converter/avro/FlattenNestedKeyConverter.java  |   7 +-
 .../avro/JsonElementConversionFactory.java         |   8 +-
 .../converter/filter/AvroFieldsPickConverter.java  |   9 +-
 .../filter/AvroFieldsPickConverterTest.java        |   2 +-
 .../AvroGenericRecordAccessorTest.java             |   6 +
 .../src/test/resources/converter/complex3.json     |  12 +-
 .../test/resources/converter/fieldPickInput.avsc   |   2 +-
 .../resources/converter/fieldPickInput_arrays.avro | Bin 552 -> 510 bytes
 gobblin-core/src/test/resources/serde/serde.avro   | Bin 277 -> 293 bytes
 gobblin-core/src/test/resources/serde/serde.avsc   |   4 +-
 gobblin-data-management/build.gradle               |   1 +
 .../hive/query/HiveAvroORCQueryGenerator.java      |   9 --
 .../conversion/hive/utils/AvroHiveTypeUtils.java   |   4 +-
 .../copy/replication/ConfigBasedMultiDatasets.java |   2 +-
 .../copy/RecursiveCopyableDatasetTest.java         |   4 +-
 gobblin-distribution/build.gradle                  |   1 +
 .../iceberg/Utils/TypeInfoToSchemaParser.java      |   4 +-
 .../reporter/FileFailureEventReporterTest.java     |   2 +-
 .../AvroStringFieldEncryptorConverterTest.java     |   6 +
 .../src/test/resources/fieldPickInput_arrays.avro  | Bin 552 -> 510 bytes
 gobblin-modules/gobblin-kafka-common/build.gradle  |   1 +
 .../converter/EnvelopePayloadConverter.java        |   8 +-
 gobblin-modules/gobblin-metadata/build.gradle      |   1 +
 gobblin-modules/gobblin-orc/build.gradle           |   1 +
 .../google/webmaster/GoogleWebmasterExtractor.java |   2 +-
 .../GoogleWebmasterExtractorIterator.java          |   2 +-
 .../extraDependencies.gradle                       |   2 +-
 .../restli/throttling/ThrottlingClientTest.java    |   2 +-
 .../throttling/ConfigStoreBasedPolicyTest.java     |   2 +-
 .../throttling/LimiterServerResourceTest.java      |   2 +-
 .../restli/throttling/PoliciesResourceTest.java    |   2 +-
 ...stHadoopKerberosKeytabAuthenticationPlugin.java |   2 +-
 gobblin-runtime/build.gradle                       |   1 +
 .../TestStandardGobblinInstanceDriver.java         |   2 +-
 .../instance/hadoop/TestHadoopConfigLoader.java    |   2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |   2 +-
 gobblin-utility/build.gradle                       |   2 +
 .../org/apache/gobblin/util/AvroFlattener.java     |  32 +----
 .../org/apache/gobblin/util/AvroSchemaUtils.java   |  84 ++++++++++++
 .../java/org/apache/gobblin/util/AvroUtils.java    |  75 +++++------
 .../gobblin/util/orc/AvroOrcSchemaConverter.java   |   5 +-
 .../org/apache/gobblin/util/AvroFlattenerTest.java |   3 +-
 .../apache/gobblin/util/AvroSchemaUtilsTest.java   | 150 +++++++++++++++++++++
 .../org/apache/gobblin/util/AvroUtilsTest.java     |  46 ++++---
 .../optionWithinOptionWithinRecord_flattened.json  |   9 +-
 .../recordWithinOptionWithinRecord_flattened.json  |   6 +-
 .../resources/props/schema_with_field_props.json   |  23 ++++
 .../resources/props/schema_with_logical_field.json |  34 +++++
 .../test/resources/props/schema_with_props.json    |  22 +++
 .../props/schema_without_field_props.json          |  22 +++
 .../test/resources/props/schema_without_props.json |  20 +++
 gobblin-yarn/build.gradle                          |   1 +
 gradle/scripts/defaultBuildProperties.gradle       |   4 +-
 gradle/scripts/dependencyDefinitions.gradle        |  17 ++-
 gradle/scripts/repositories.gradle                 |   5 +-
 68 files changed, 569 insertions(+), 179 deletions(-)

diff --git a/gobblin-cluster/build.gradle b/gobblin-cluster/build.gradle
index 5b574eb95..3493c2b15 100644
--- a/gobblin-cluster/build.gradle
+++ b/gobblin-cluster/build.gradle
@@ -50,6 +50,7 @@ dependencies {
   compile (externalDependency.helix) {
     exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
   }
+  compile externalDependency.jacksonMapperAsl
 
   runtimeOnly project(":gobblin-modules:gobblin-service-kafka")
 
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index 672bcbce9..64b5fdecd 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -46,6 +46,7 @@ dependencies {
 
   runtimeOnly(externalDependency.hiveService) {
     exclude group: 'org.apache.hive', module: 'hive-exec'
+    exclude group: 'com.linkedin.hive', module: 'hive-exec'
   }
   runtimeOnly externalDependency.hiveJdbc
   runtimeOnly externalDependency.hiveMetastore
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 5f2e9fd9a..6a01b3923 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.compaction.mapreduce.avro;
 
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,6 +28,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.node.ObjectNode;
@@ -79,7 +81,9 @@ public class FieldAttributeBasedDeltaFieldsProvider implements AvroDeltaFieldNam
   private List<String> getDeltaFieldNamesForNewSchema(Schema originalSchema) {
     List<String> deltaFields = new ArrayList<>();
     for (Field field : originalSchema.getFields()) {
-      String deltaAttributeField = field.getJsonProp(this.attributeField).getValueAsText();
+      // Avro 1.9 compatible change - replaced deprecated public api getJsonProp with AvroCompatibilityHelper methods
+      String deltaAttributeField = AvroCompatibilityHelper.getFieldPropAsJsonString(field, this.attributeField, 
+          true, false);
       ObjectNode objectNode = getDeltaPropValue(deltaAttributeField);
       if (objectNode == null || objectNode.get(this.deltaPropName) == null) {
         continue;
@@ -98,7 +102,8 @@ public class FieldAttributeBasedDeltaFieldsProvider implements AvroDeltaFieldNam
       JsonParser jp = jf.createJsonParser(json);
       ObjectMapper objMap = new ObjectMapper(jf);
       jp.setCodec(objMap);
-      return (ObjectNode) jp.readValueAsTree();
+      JsonNode jsonNode = jp.readValueAsTree();
+      return (ObjectNode) objMap.readTree(jsonNode.asText());
     } catch (IOException e) {
       return null;
     }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 2a2ab6b20..82c891d84 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -37,6 +37,7 @@ import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.gobblin.compaction.dataset.Dataset;
 import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
@@ -198,7 +199,8 @@ public class MRCompactorAvroKeyDedupJobRunner extends MRCompactorJobRunner {
     for (Field field : record.getFields()) {
       Optional<Schema> newFieldSchema = getKeySchema(field);
       if (newFieldSchema.isPresent()) {
-        fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(), field.defaultValue()));
+        fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), newFieldSchema.get(), field.doc(),
+            AvroUtils.getCompatibleDefaultValue(field)));
       }
     }
     if (!fields.isEmpty()) {
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
index 8557a8c83..225f33980 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index fb7a4a745..83d1c9deb 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -31,6 +31,7 @@ dependencies {
   compile externalDependency.avroMapredH2
   compile externalDependency.commonsCodec
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.typesafeConfig
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index 87d410175..c2f20c295 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -22,14 +22,15 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.codehaus.jackson.JsonNode;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
+import org.apache.gobblin.util.AvroSchemaUtils;
+import org.apache.gobblin.util.AvroUtils;
 
 
 /**
@@ -108,7 +109,7 @@ public class AvroSchemaFieldRemover {
   private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
 
     Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
-    convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
+    AvroSchemaUtils.copySchemaProperties(schema, newRecord);
 
     // Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
     // The fields in the incomplete schema will be populated once the current schema is completely processed.
@@ -119,15 +120,16 @@ public class AvroSchemaFieldRemover {
       if (!this.shouldRemove(field)) {
         Field newField;
         if (this.children.containsKey(field.name())) {
-          newField = new Field(field.name(), this.children.get(field.name()).removeFields(field.schema(), schemaMap),
-              field.doc(), field.defaultValue());
+          newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+              this.children.get(field.name()).removeFields(field.schema(), schemaMap),
+              field.doc(), AvroUtils.getCompatibleDefaultValue(field));
         } else {
-          newField = new Field(field.name(), DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
-              field.defaultValue());
-        }
-        for (Map.Entry<String, JsonNode> stringJsonNodeEntry : field.getJsonProps().entrySet()) {
-          newField.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+          newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+              DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
+              AvroUtils.getCompatibleDefaultValue(field));
         }
+        // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with AvroCompatibilityHelper methods
+        AvroSchemaUtils.copyFieldProperties(field, newField);
         newFields.add(newField);
       }
     }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index bcad4f9e1..c44e2f578 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -85,10 +86,11 @@ public class GobblinTrackingEventFlattenFilterConverter extends AvroToAvroConver
       String curFieldName = field.name();
       if (!field.schema().getType().equals(Schema.Type.MAP)) {
         if (fieldsRenameMap.containsKey(curFieldName)) {
-          newFields.add(
-              new Schema.Field(fieldsRenameMap.get(curFieldName), field.schema(), field.doc(), field.defaultValue()));
+          newFields.add(AvroCompatibilityHelper.createSchemaField(fieldsRenameMap.get(curFieldName), field.schema(),
+              field.doc(), AvroUtils.getCompatibleDefaultValue(field)));
         } else {
-          newFields.add(new Schema.Field(curFieldName, field.schema(), field.doc(), field.defaultValue()));
+          newFields.add(AvroCompatibilityHelper.createSchemaField(curFieldName, field.schema(), field.doc(),
+              AvroUtils.getCompatibleDefaultValue(field)));
         }
         this.nonMapFields.add(curFieldName);
       } else {
@@ -102,7 +104,7 @@ public class GobblinTrackingEventFlattenFilterConverter extends AvroToAvroConver
     for (String fieldToFlatten : ConfigUtils.getStringList(config, FIELDS_TO_FLATTEN)) {
       String newFieldName =
           this.fieldsRenameMap.containsKey(fieldToFlatten) ? this.fieldsRenameMap.get(fieldToFlatten) : fieldToFlatten;
-      newFields.add(new Field(newFieldName, Schema.create(Schema.Type.STRING), "", null));
+      newFields.add(AvroCompatibilityHelper.createSchemaField(newFieldName, Schema.create(Schema.Type.STRING), "", null));
     }
 
     return this;
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
index 587280806..8aaf328c6 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.base.Throwables;
+import com.google.common.base.Throwables;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.SourceState;
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
index 669d040e2..1e4d0129d 100644
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
@@ -37,16 +37,16 @@ public class AvroSchemaFieldRemoverTest {
   public void testRemoveFields() throws IllegalArgumentException, IOException {
     Schema convertedSchema1 = convertSchema("/converter/recursive_schema_1.avsc", "YwchQiH.OjuzrLOtmqLW");
     Schema expectedSchema1 = parseSchema("/converter/recursive_schema_1_converted.avsc");
-    Assert.assertEquals(convertedSchema1, expectedSchema1);
+    Assert.assertEquals(convertedSchema1.toString(), expectedSchema1.toString());
 
     Schema convertedSchema2 =
         convertSchema("/converter/recursive_schema_2.avsc", "FBuKC.wIINqII.lvaerUEKxBQUWg,eFQjDj.TzuYZajb");
     Schema expectedSchema2 = parseSchema("/converter/recursive_schema_2_converted.avsc");
-    Assert.assertEquals(convertedSchema2, expectedSchema2);
+    Assert.assertEquals(convertedSchema2.toString(), expectedSchema2.toString());
 
     Schema convertedSchema3 = convertSchema("/converter/recursive_schema_2.avsc", "field.that.does.not.exist");
     Schema expectedSchema3 = parseSchema("/converter/recursive_schema_2_not_converted.avsc");
-    Assert.assertEquals(convertedSchema3, expectedSchema3);
+    Assert.assertEquals(convertedSchema3.toString(), expectedSchema3.toString());
   }
 
   private Schema parseSchema(String schemaFile) throws IOException {
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
index 0989988c9..4c1577d1c 100644
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
@@ -45,12 +45,13 @@ public class GobblinTrackingEventFlattenFilterConverterTest {
     Schema output = converter.convertSchema(
         new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
         workUnitState);
-    Assert.assertEquals(output, new Schema.Parser().parse(
+    Schema parsedSchema = new Schema.Parser().parse(
         "{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
             + "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at which event was created.\",\"default\":0},"
             + "{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace used for filtering of events.\"},"
             + "{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event name.\"},{\"name\":\"field1\",\"type\":\"string\",\"doc\":\"\"},"
-            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+    Assert.assertEquals(output.toString(), parsedSchema.toString());
 
     props.put(GobblinTrackingEventFlattenFilterConverter.class.getSimpleName() + "."
         + GobblinTrackingEventFlattenFilterConverter.FIELDS_RENAME_MAP, "name:eventName,field1:field3");
@@ -61,11 +62,12 @@ public class GobblinTrackingEventFlattenFilterConverterTest {
     Schema output2 = converter.convertSchema(
         new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
         workUnitState2);
-    Assert.assertEquals(output2, new Schema.Parser().parse(
+    parsedSchema = new Schema.Parser().parse(
         "{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
             + "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at which event was created.\",\"default\":0},"
             + "{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace used for filtering of events.\"},"
             + "{\"name\":\"eventName\",\"type\":\"string\",\"doc\":\"Event name.\"},{\"name\":\"field3\",\"type\":\"string\",\"doc\":\"\"},"
-            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+    Assert.assertEquals(output2.toString(), parsedSchema.toString());
   }
 }
diff --git a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
index 66a8fae8c..02d243296 100644
--- a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
+++ b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj N [...]
+{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj N [...]
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index 9a0bd26bd..9ca24678d 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -36,6 +36,7 @@ dependencies {
   compile externalDependency.commonsMath
   compile externalDependency.commonsHttpClient
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.guava
   compile externalDependency.gson
   compile externalDependency.slf4j
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
index ccf081933..ec5309ba5 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Optional;
@@ -75,7 +76,8 @@ public class FlattenNestedKeyConverter extends Converter<Schema, Schema, Generic
     List<Field> fields = new ArrayList<>();
     // Clone the existing fields
     for (Field field : inputSchema.getFields()) {
-      fields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()));
+      fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), field.schema(), field.doc(),
+          AvroUtils.getCompatibleDefaultValue(field), field.order()));
     }
 
     // Convert each of nested keys into a top level field
@@ -102,7 +104,8 @@ public class FlattenNestedKeyConverter extends Converter<Schema, Schema, Generic
       Field field = optional.get();
 
       // Make a copy under a new name
-      Field copy = new Field(name, field.schema(), field.doc(), field.defaultValue(), field.order());
+      Field copy = AvroCompatibilityHelper.createSchemaField(name, field.schema(), field.doc(),
+          AvroUtils.getCompatibleDefaultValue(field), field.order());
       fields.add(copy);
     }
 
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
index 0d6707d18..fbd05a607 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
@@ -35,7 +35,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.EmptyIterable;
 import org.apache.gobblin.converter.json.JsonSchema;
-import org.codehaus.jackson.node.JsonNodeFactory;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import lombok.extern.java.Log;
 import sun.util.calendar.ZoneInfo;
@@ -618,8 +618,10 @@ public class JsonElementConversionFactory {
           throw new UnsupportedOperationException(e);
         }
 
-        Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema, map.getComment(),
-            map.isNullable() ? JsonNodeFactory.instance.nullNode() : null);
+        // [Avro 1.9.2 upgrade] No need to pass JsonNodeFactory.instance.nullNode() if map is nullable.
+        // AvroCompatibilityHelper will take care of this.
+        Schema.Field fld = AvroCompatibilityHelper.createSchemaField(map.getColumnName(), fldSchema, map.getComment(),
+            null);
         fld.addProp(SOURCE_TYPE, sourceType);
         fields.add(fld);
       }
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index 6f341fea5..e1499abad 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,14 +145,14 @@ public class AvroFieldsPickConverter extends AvroToAvroConverterBase {
       Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + recordSchema);
 
       if (child.children.isEmpty()) { //Leaf
-        newFields.add(
-            new Field(innerSrcField.name(), innerSrcField.schema(), innerSrcField.doc(), innerSrcField.defaultValue()));
+        newFields.add(AvroCompatibilityHelper.createSchemaField(innerSrcField.name(), innerSrcField.schema(),
+            innerSrcField.doc(), AvroUtils.getCompatibleDefaultValue(innerSrcField)));
       } else {
         Schema innerSrcSchema = innerSrcField.schema();
 
         Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child); //Recurse of schema
-        Field innerDestField =
-            new Field(innerSrcField.name(), innerDestSchema, innerSrcField.doc(), innerSrcField.defaultValue());
+        Field innerDestField = AvroCompatibilityHelper.createSchemaField(innerSrcField.name(), innerDestSchema,
+            innerSrcField.doc(), AvroUtils.getCompatibleDefaultValue(innerSrcField));
         newFields.add(innerDestField);
       }
     }
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
index d1244e8ab..1bff418cb 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
@@ -88,7 +88,7 @@ public class AvroFieldsPickConverterTest {
         while (expectedDataFileReader.hasNext()) {
           GenericRecord expected = expectedDataFileReader.next();
           GenericRecord actual = converter.convertRecord(convertedSchema, srcDataFileReader.next(), workUnitState).iterator().next();
-          Assert.assertEquals(actual, expected);
+          Assert.assertEquals(actual.toString(), expected.toString());
         }
         Assert.assertTrue(!srcDataFileReader.hasNext());
       }
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
index 17e46b08b..396928b3d 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
@@ -139,6 +139,12 @@ public class AvroGenericRecordAccessorTest {
 
   @Test
   public void testGetMultiConvertsStrings() throws IOException {
+    // The below error is due to invalid avro data. As per avro, the default value must have the same type as the first 
+    // entry in the union. As the default value is null, type with "null" union must have "null" type first and then
+    // actual type. This is corrected in fieldPickInput.avsc file and fieldPickInput_arrays.avro
+    // Error: org.apache.avro.AvroTypeException: Invalid default for field favorite_quotes: null
+    // not a [{"type":"array","items":"string"},"null"]
+    // Correct data: "type": ["null", { "type": "array", "items": "string"}, "default": null]
     updateRecordFromTestResource("converter/fieldPickInput", "converter/fieldPickInput_arrays.avro");
     Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes");
     Object val = ret.get("favorite_quotes");
diff --git a/gobblin-core/src/test/resources/converter/complex3.json b/gobblin-core/src/test/resources/converter/complex3.json
index 6f57998b4..9872d5140 100644
--- a/gobblin-core/src/test/resources/converter/complex3.json
+++ b/gobblin-core/src/test/resources/converter/complex3.json
@@ -13,8 +13,8 @@
             "isNullable": true,
             "dataType": {
               "type": [
-                "int",
-                "null"
+                "null",
+                "int"
               ]
             }
           },
@@ -262,13 +262,13 @@
             {
               "name": "id",
               "type": [
-                {
-                  "type": "int",
-                  "source.type": "int"
-                },
                 {
                   "type": "null",
                   "source.type": "null"
+                },
+                {
+                  "type": "int",
+                  "source.type": "int"
                 }
               ],
               "doc": "System-assigned numeric user ID. Cannot be changed by the user.",
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
index 4da3fdf03..17e7fdaec 100644
--- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
+++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
@@ -8,6 +8,6 @@
      {"name": "date_of_birth", "type": "long"},
      {"name": "last_modified", "type": "long"},
      {"name": "created", "type": "long"},
-     {"name": "favorite_quotes", "type": [{ "type": "array", "items": "string"}, "null"], "default": null}
+     {"name": "favorite_quotes", "type": ["null", { "type": "array", "items": "string"}], "default": null}
  ]
 }
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
index c10a607f2..13130a89d 100644
Binary files a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro and b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avro b/gobblin-core/src/test/resources/serde/serde.avro
index fe23a8daf..977e380d2 100644
Binary files a/gobblin-core/src/test/resources/serde/serde.avro and b/gobblin-core/src/test/resources/serde/serde.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avsc b/gobblin-core/src/test/resources/serde/serde.avsc
index 470827aa0..c7f3b5512 100644
--- a/gobblin-core/src/test/resources/serde/serde.avsc
+++ b/gobblin-core/src/test/resources/serde/serde.avsc
@@ -3,7 +3,7 @@
  "name": "User",
  "fields": [
      {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
+     {"name": "favorite_number",  "type": ["null", "int"]},
+     {"name": "favorite_color", "type": ["null", "string"]}
  ]
 }
\ No newline at end of file
diff --git a/gobblin-data-management/build.gradle b/gobblin-data-management/build.gradle
index e075c2811..47b8fa6b5 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -45,6 +45,7 @@ dependencies {
   compile externalDependency.findBugsAnnotations
   compile externalDependency.testng
   compile externalDependency.junit
+  compile externalDependency.jacksonMapperAsl
 
   testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.11.1', classifier: 'tests') {
     transitive = false
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index f983c02cb..a90cc33f1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -368,15 +368,6 @@ public class HiveAvroORCQueryGenerator {
     return String.format("DROP TABLE IF EXISTS `%s`.`%s`", dbName, tableName);
   }
 
-  /***
-   * Adapt Avro schema / types to Hive column types
-   * @param schema Schema to adapt and generate Hive columns with corresponding types
-   * @param hiveColumns Optional Map to populate with the generated hive columns for reference of caller
-   * @param topLevel If this is first level
-   * @return Generate Hive columns with types for given Avro schema
-   */
-
-
   /***
    * Use destination table schema to generate column mapping
    * @param hiveColumns Optional Map to populate with the generated hive columns for reference of caller
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
index 5134af142..39fb3c206 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.conversion.hive.utils;
 
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import java.util.List;
 import java.util.Map;
 
@@ -205,7 +206,8 @@ public class AvroHiveTypeUtils {
         .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
-        maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+        maxLength = Integer.parseInt(AvroCompatibilityHelper.getSchemaPropAsJsonString(schema, 
+            AvroSerDe.AVRO_PROP_MAX_LENGTH, false, false));
       } catch (Exception ex) {
         throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex);
       }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
index 3f9a57ffe..076b27924 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
@@ -17,7 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.replication;
 
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.gobblin.dataset.Dataset;
 import java.io.IOException;
 import java.net.URI;
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
index 14011d214..d624f8250 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
@@ -42,8 +42,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 
-import avro.shaded.com.google.common.base.Predicate;
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
 import lombok.Data;
 
diff --git a/gobblin-distribution/build.gradle b/gobblin-distribution/build.gradle
index c7844babb..43cbf3272 100644
--- a/gobblin-distribution/build.gradle
+++ b/gobblin-distribution/build.gradle
@@ -24,6 +24,7 @@ configurations {
     }
     if (rootProject.hasProperty('excludeHiveDeps')) {
       exclude group: "org.apache.hive"
+      exclude group: "com.linkedin.hive"
     }
     exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
   }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/TypeInfoToSchemaParser.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/TypeInfoToSchemaParser.java
index a14c237ff..514bb68e4 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/TypeInfoToSchemaParser.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/TypeInfoToSchemaParser.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.codehaus.jackson.node.JsonNodeFactory;
-import org.codehaus.jackson.node.NullNode;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 
 
@@ -78,7 +78,7 @@ public final class TypeInfoToSchemaParser {
       fieldName = (String) this._downToUpCaseMap.getOrDefault(fieldName, fieldName);
       Schema schema = this.parseSchemaFromTypeInfo(fieldTypeInfo, recordNamespace + "." + recordName.toLowerCase(),
           StringUtils.capitalize(fieldName));
-      Field f = new Field(fieldName, schema, (String) null, this._mkFieldsOptional ? NullNode.instance : null);
+      Field f = AvroCompatibilityHelper.createSchemaField(fieldName, schema, null, null);
       fields.add(f);
     }
 
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
index 389e6abde..07f01729d 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.annotations.Test;
 
-import avro.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
index d16766c77..d599a0466 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
@@ -97,6 +97,12 @@ public class AvroStringFieldEncryptorConverterTest {
     wuState.getJobState().setProp("converter.encrypt.algorithm", "insecure_shift");
 
     converter.init(wuState);
+    // The below error is due to invalid avro data. As per avro, the default value must have the same type as the first 
+    // entry in the union. As the default value is null, type with "null" union must have "null" type first and then
+    // actual type. This is corrected in fieldPickInput.avsc file and fieldPickInput_arrays.avro
+    // Error: org.apache.avro.AvroTypeException: Invalid default for field favorite_quotes: null
+    // not a [{"type":"array","items":"string"},"null"]
+    // Correct data: "type": ["null", { "type": "array", "items": "string"}, "default": null]
     GenericRecord inputRecord =
         getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath());
     GenericArray origValues = (GenericArray) inputRecord.get("favorite_quotes");
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
index c10a607f2..13130a89d 100644
Binary files a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro and b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro differ
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle
index 00954b91b..d9a83e606 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -27,6 +27,7 @@ dependencies {
   compile project(":gobblin-core-base")
 
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.confluentSchemaRegistryClient
   compile externalDependency.commonsCodec
   compile externalDependency.commonsHttpClient
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
index 6408a4c20..198490a4f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -24,7 +24,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.util.AvroUtils;
 
 
 /**
@@ -70,7 +72,8 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
       return createLatestPayloadField(field);
     }
     // Make a copy of the field to the output schema
-    return new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
+    return AvroCompatibilityHelper.createSchemaField(field.name(), field.schema(), field.doc(),
+        AvroUtils.getCompatibleDefaultValue(field), field.order());
   }
 
   /**
@@ -83,7 +86,8 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
       throws SchemaConversionException {
     try {
       Schema payloadSchema = fetchLatestPayloadSchema();
-      return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(), field.order());
+      return AvroCompatibilityHelper.createSchemaField(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC,
+          AvroUtils.getCompatibleDefaultValue(field), field.order());
     } catch (Exception e) {
       throw new SchemaConversionException(e);
     }
diff --git a/gobblin-modules/gobblin-metadata/build.gradle b/gobblin-modules/gobblin-metadata/build.gradle
index e982f9392..11ca161be 100644
--- a/gobblin-modules/gobblin-metadata/build.gradle
+++ b/gobblin-modules/gobblin-metadata/build.gradle
@@ -30,6 +30,7 @@ dependencies {
   compile externalDependency.gson
   compile externalDependency.jacksonCore
   compile externalDependency.jacksonMapper
+  compile externalDependency.jacksonMapperAsl
   compile externalDependency.slf4j
 
   testCompile project(":gobblin-test-utils")
diff --git a/gobblin-modules/gobblin-orc/build.gradle b/gobblin-modules/gobblin-orc/build.gradle
index 603446e85..5495cc25c 100644
--- a/gobblin-modules/gobblin-orc/build.gradle
+++ b/gobblin-modules/gobblin-orc/build.gradle
@@ -22,6 +22,7 @@ dependencies {
   // but to use HiveStorageAPI 2.x specified below.
   compile (project(':gobblin-core')) {
     exclude group: 'org.apache.hive', module: 'hive-exec'
+    exclude group: 'com.linkedin.hive', module: 'hive-exec'
   }
 
   // Cannot use compileOnly as it cannot cover testCompile
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
index 557c337dd..1410f00b6 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
@@ -31,7 +31,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
 import com.google.common.base.Splitter;
 import com.google.gson.JsonArray;
 
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
index 042bbae5c..df3e47902 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
@@ -41,7 +41,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
 import com.google.api.services.webmasters.model.SearchAnalyticsQueryResponse;
 import com.google.common.base.Optional;
 
-import avro.shaded.com.google.common.base.Joiner;
+import com.google.common.base.Joiner;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.WorkUnitState;
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/extraDependencies.gradle b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/extraDependencies.gradle
index e15ad26b3..5e00791da 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/extraDependencies.gradle
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/extraDependencies.gradle
@@ -24,7 +24,7 @@ shadowJar {
   zip64 true
   dependencies {
     exclude(dependency {
-      it.moduleGroup.startsWith("org.apache.hadoop") || it.moduleGroup.startsWith("org.apache.hive")
+      it.moduleGroup.startsWith("org.apache.hadoop") || it.moduleGroup.startsWith("org.apache.hive") || it.moduleGroup.startsWith("com.linkedin.hive")
     })
   }
   mergeServiceFiles()
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
index 135f4624d..10be1711e 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class ThrottlingClientTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
index b0e681289..92e33365e 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
@@ -29,7 +29,7 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class ConfigStoreBasedPolicyTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
index bc538eca4..15fcae4bc 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 
 public class LimiterServerResourceTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
index c22062cbc..71629afec 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
@@ -38,7 +38,7 @@ public class PoliciesResourceTest {
     ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
     SharedLimiterKey res1key = new SharedLimiterKey("res1");
 
-    Map<String, String> configMap = avro.shaded.com.google.common.collect.ImmutableMap.<String, String>builder()
+    Map<String, String> configMap = com.google.common.collect.ImmutableMap.<String, String>builder()
         .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key, null, ThrottlingPolicyFactory.POLICY_KEY),
             CountBasedPolicy.FACTORY_ALIAS)
         .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key, null, CountBasedPolicy.COUNT_KEY), "100")
diff --git a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
index 0a567a1e1..bf29d052a 100644
--- a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
+++ b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
 import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Unit tests for {@link HadoopKerberosKeytabAuthenticationPlugin}
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 8149129e1..df86c5cec 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -74,6 +74,7 @@ dependencies {
   compile externalDependency.httpcore
   compile externalDependency.jacksonCore
   compile externalDependency.jacksonMapper
+  compile externalDependency.jacksonMapperAsl
   compile externalDependency.javaxInject
   compile externalDependency.jodaTime
   compile externalDependency.metricsCore
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
index 8ad909390..a56410e61 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
@@ -36,7 +36,7 @@ import org.apache.gobblin.runtime.api.GobblinInstancePluginFactory;
 import org.apache.gobblin.runtime.plugins.email.EmailNotificationPlugin;
 import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
index ce9aa508f..a85c85eb3 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
@@ -23,7 +23,7 @@ import org.testng.annotations.Test;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Unit tests for {@link HadoopConfigLoader}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 272c94977..1a19e755e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -40,7 +40,7 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 7c85ccbe9..c80ea8346 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,8 @@ dependencies {
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.avro
+  compile externalDependency.avroCompiler
+  compile externalDependency.avroCompatHelper
   compile externalDependency.orcCore
   compile externalDependency.hiveMetastore
   compile externalDependency.jodaTime
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 771026cc5..f81ce3878 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -20,13 +20,12 @@ package org.apache.gobblin.util;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonNode;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -412,14 +411,14 @@ public class AvroFlattener {
             }
           }
         }
-        Schema.Field field = new Schema.Field(flattenName, flattenedFieldSchema, f.doc(), f.defaultValue(), f.order());
+        Schema.Field field = AvroCompatibilityHelper.createSchemaField(flattenName, flattenedFieldSchema, f.doc(),
+            AvroUtils.getCompatibleDefaultValue(f), f.order());
 
         if (StringUtils.isNotBlank(flattenSource)) {
           field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
         }
-        for (Map.Entry<String, JsonNode> entry : f.getJsonProps().entrySet()) {
-          field.addProp(entry.getKey(), entry.getValue());
-        }
+        // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with AvroCompatibilityHelper methods
+        AvroSchemaUtils.copyFieldProperties(f, field);
         flattenedFields.add(field);
       }
     }
@@ -466,24 +465,7 @@ public class AvroFlattener {
   private static void copyProperties(Schema oldSchema, Schema newSchema) {
     Preconditions.checkNotNull(oldSchema);
     Preconditions.checkNotNull(newSchema);
-
-    Map<String, JsonNode> props = oldSchema.getJsonProps();
-    copyProperties(props, newSchema);
-  }
-
-  /***
-   * Copy properties to an Avro Schema
-   * @param props Properties to copy to Avro Schema
-   * @param schema Avro Schema to copy properties to
-   */
-  private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
-    Preconditions.checkNotNull(schema);
-
-    // (if null, don't copy but do not throw exception)
-    if (null != props) {
-      for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
-        schema.addProp(prop.getKey(), prop.getValue());
-      }
-    }
+    // Avro 1.9 compatible change - replaced deprecated public api getJsonProps using AvroCompatibilityHelper methods
+    AvroSchemaUtils.copySchemaProperties(oldSchema, newSchema);
   }
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
new file mode 100644
index 000000000..fc751d9d9
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+
+
+/**
+ * Avro schema utility class to perform schema property conversion to the appropriate data types
+ */
+@Slf4j
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {
+
+  }
+
+  /**
+   * Get schema property value as integer
+   * @param schema
+   * @param prop
+   * @return Integer
+   */
+  public static Integer getValueAsInteger(final Schema schema, String prop) {
+    String value = AvroCompatibilityHelper.getSchemaPropAsJsonString(schema, prop, 
+        false, false);
+    try {
+      return Integer.parseInt(value);
+    } catch (NumberFormatException ex) {
+      log.error("Exception while converting to integer ", ex.getCause());
+      throw new IllegalArgumentException(ex);
+    }
+  }
+
+  /***
+   * Copy properties to an Avro Schema field
+   * @param fromField Avro Schema Field to copy properties from
+   * @param toField Avro Schema Field to copy properties to
+   */
+  public static void copyFieldProperties(final Schema.Field fromField, final Schema.Field toField) {
+    List<String> allPropNames = AvroCompatibilityHelper.getAllPropNames(fromField);
+    if (null != allPropNames) {
+      for (String propName : allPropNames) {
+        String propValue = AvroCompatibilityHelper.getFieldPropAsJsonString(fromField, propName, 
+            true, false);
+        AvroCompatibilityHelper.setFieldPropFromJsonString(toField, propName, propValue, false);
+      }
+    }
+  }
+
+  /***
+   * Copy properties to an Avro Schema
+   * @param fromSchema Avro Schema to copy properties from
+   * @param toSchema Avro Schema to copy properties to
+   */
+  public static void copySchemaProperties(final Schema fromSchema, final Schema toSchema) {
+    List<String> allPropNames = AvroCompatibilityHelper.getAllPropNames(fromSchema);
+    if (null != allPropNames) {
+      for (String propName : allPropNames) {
+        String propValue = AvroCompatibilityHelper.getSchemaPropAsJsonString(fromSchema, propName, 
+            true, false);
+        AvroCompatibilityHelper.setSchemaPropFromJsonString(toSchema, propName, propValue, false);
+      }
+    }
+  }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 0ac154497..838318828 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -51,6 +51,7 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
@@ -62,7 +63,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.codehaus.jackson.JsonNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +78,7 @@ import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
@@ -135,24 +136,14 @@ public class AvroUtils {
   public static List<Field> deepCopySchemaFields(Schema readerSchema) {
     return readerSchema.getFields().stream()
         .map(field -> {
-          Field f = new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
-          field.getProps().forEach((key, value) -> f.addProp(key, value));
+          Field f = AvroCompatibilityHelper.createSchemaField(field.name(), field.schema(), field.doc(),
+              getCompatibleDefaultValue(field), field.order());
+          AvroSchemaUtils.copyFieldProperties(field, f);
           return f;
         })
         .collect(Collectors.toList());
   }
 
-  /**
-   * Generate a {@link Schema} object from {@link Schema.Field} with Field's properties carried over to the new object.
-   * Common use cases for this method is in traversing {@link Schema} object into nested level and create {@link Schema}
-   * object for non-root level.
-   */
-  public static void convertFieldToSchemaWithProps(Map<String,JsonNode> fieldProps, Schema targetSchemaObj) {
-    for (Map.Entry<String, JsonNode> stringJsonNodeEntry : fieldProps.entrySet()) {
-      targetSchemaObj.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
-    }
-  }
-
 
   public static class AvroPathFilter implements PathFilter {
     @Override
@@ -604,7 +595,8 @@ public class AvroUtils {
 
     List<Field> combinedFields = Lists.newArrayList();
     for (Field newFld : newSchema.getFields()) {
-      combinedFields.add(new Field(newFld.name(), newFld.schema(), newFld.doc(), newFld.defaultValue()));
+      combinedFields.add(AvroCompatibilityHelper.createSchemaField(newFld.name(), newFld.schema(), newFld.doc(),
+          getCompatibleDefaultValue(newFld)));
     }
 
     for (Field oldFld : oldSchema.getFields()) {
@@ -619,12 +611,15 @@ public class AvroUtils {
             }
           }
           Schema newFldSchema = Schema.createUnion(union);
-          combinedFields.add(new Field(oldFld.name(), newFldSchema, oldFld.doc(), oldFld.defaultValue()));
+          combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(), newFldSchema, oldFld.doc(),
+              getCompatibleDefaultValue(oldFld)));
         } else {
           union.add(Schema.create(Type.NULL));
           union.add(oldFldSchema);
           Schema newFldSchema = Schema.createUnion(union);
-          combinedFields.add(new Field(oldFld.name(), newFldSchema, oldFld.doc(), oldFld.defaultValue()));
+          Object obj = getCompatibleDefaultValue(oldFld);
+          combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(), newFldSchema, oldFld.doc(),
+              getCompatibleDefaultValue(oldFld)));
         }
       }
     }
@@ -673,7 +668,8 @@ public class AvroUtils {
     for (Field field : record.getFields()) {
       Optional<Schema> newFieldSchema = removeUncomparableFields(field.schema(), processed);
       if (newFieldSchema.isPresent()) {
-        fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(), field.defaultValue()));
+        fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), newFieldSchema.get(), field.doc(),
+            getCompatibleDefaultValue(field)));
       }
     }
 
@@ -733,7 +729,8 @@ public class AvroUtils {
         if (null == input) {
           return null;
         }
-        Field field = new Field(input.name(), input.schema(), input.doc(), input.defaultValue(), input.order());
+        Field field = AvroCompatibilityHelper.createSchemaField(input.name(), input.schema(), input.doc(),
+            getCompatibleDefaultValue(input), input.order());
         return field;
       }
     });
@@ -776,8 +773,8 @@ public class AvroUtils {
         List<Schema.Field> newFields = new ArrayList<>();
         if (schema.getFields().size() > 0) {
           for (Schema.Field oldField : schema.getFields()) {
-            Field newField = new Field(oldField.name(), switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(),
-                oldField.defaultValue(), oldField.order());
+            Field newField = AvroCompatibilityHelper.createSchemaField(oldField.name(), switchNamespace(oldField.schema(),
+                namespaceOverride), oldField.doc(), getCompatibleDefaultValue(oldField), oldField.order());
             // Copy field level properties
             copyFieldProperties(oldField, newField);
             newFields.add(newField);
@@ -830,25 +827,8 @@ public class AvroUtils {
   private static void copyProperties(Schema oldSchema, Schema newSchema) {
     Preconditions.checkNotNull(oldSchema);
     Preconditions.checkNotNull(newSchema);
-
-    Map<String, JsonNode> props = oldSchema.getJsonProps();
-    copyProperties(props, newSchema);
-  }
-
-  /***
-   * Copy properties to an Avro Schema
-   * @param props Properties to copy to Avro Schema
-   * @param schema Avro Schema to copy properties to
-   */
-  private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
-    Preconditions.checkNotNull(schema);
-
-    // (if null, don't copy but do not throw exception)
-    if (null != props) {
-      for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
-        schema.addProp(prop.getKey(), prop.getValue());
-      }
-    }
+    // Avro 1.9 compatible change - replaced deprecated public api getJsonProps using AvroCompatibilityHelper methods
+    AvroSchemaUtils.copySchemaProperties(oldSchema, newSchema);
   }
 
   /**
@@ -1088,8 +1068,8 @@ public class AvroUtils {
           Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry, newParents, fieldsWithRecursion);
           if (copiedFieldSchema == null) {
           } else {
-            Schema.Field copiedField =
-                new Schema.Field(field.name(), copiedFieldSchema, field.doc(), field.defaultValue(), field.order());
+            Schema.Field copiedField = AvroCompatibilityHelper.createSchemaField(field.name(), copiedFieldSchema,
+                field.doc(), getCompatibleDefaultValue(field), field.order());
             copyFieldProperties(field, copiedField);
             copiedSchemaFields.add(copiedField);
           }
@@ -1142,7 +1122,16 @@ public class AvroUtils {
    * @param copiedField
    */
   private static void copyFieldProperties(Schema.Field sourceField, Schema.Field copiedField) {
-    sourceField.getProps().forEach((key, value) -> copiedField.addProp(key, value));
+    AvroSchemaUtils.copyFieldProperties(sourceField, copiedField);
   }
 
+  @Nullable
+  public static Object getCompatibleDefaultValue(Schema.Field field) {
+    return AvroCompatibilityHelper.fieldHasDefault(field)
+        ? AvroCompatibilityHelper.getGenericDefaultValue(field)
+        : null;
+  }
+
+
+
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
index 1e9a6a0b1..9f227ca5d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.util.orc;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
+import org.apache.gobblin.util.AvroSchemaUtils;
 import org.apache.orc.TypeDescription;
 
 
@@ -101,8 +102,8 @@ public class AvroOrcSchemaConverter {
    */
   private static TypeDescription getTypeDescriptionForBinarySchema(Schema avroSchema) {
     if ("decimal".equalsIgnoreCase(avroSchema.getProp("logicalType"))) {
-      int scale = avroSchema.getJsonProp("scale").asInt(0);
-      int precision = avroSchema.getJsonProp("precision").asInt();
+      int scale = AvroSchemaUtils.getValueAsInteger(avroSchema, "scale");
+      int precision = AvroSchemaUtils.getValueAsInteger(avroSchema, "precision");
 
       return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
     }
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
index 521bda277..c6889da19 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
@@ -114,8 +114,7 @@ public class AvroFlattenerTest {
 
     Schema originalSchema = readSchemaFromJsonFile("optionWithinOptionWithinRecord_original.json");
     Schema expectedSchema = readSchemaFromJsonFile("optionWithinOptionWithinRecord_flattened.json");
-
-    Assert.assertEquals(new AvroFlattener().flatten(originalSchema, false), expectedSchema);
+    Assert.assertEquals(new AvroFlattener().flatten(originalSchema, false).toString(), expectedSchema.toString());
   }
 
   /**
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroSchemaUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroSchemaUtilsTest.java
new file mode 100644
index 000000000..825195343
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroSchemaUtilsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.google.common.collect.Lists;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class AvroSchemaUtilsTest {
+  
+  @Test
+  public void testGetValueAsInteger() throws IOException {
+    Schema schema = readSchemaFromJsonFile("props/schema_with_logical_field.json");
+    Schema.Field field = schema.getField("logicalFieldDecimal");
+    Schema fieldSchema = field.schema();
+    Assert.assertEquals(AvroSchemaUtils.getValueAsInteger(fieldSchema, "precision").intValue(), 4);
+    Assert.assertEquals(AvroSchemaUtils.getValueAsInteger(fieldSchema, "scale").intValue(), 2);
+  }
+  
+  @Test
+  public void testCopySchemaProperties() throws IOException {
+
+    Schema fromSchema = Schema.createRecord("name", "", "namespace", false);
+    fromSchema.addProp("prop1", "val1");
+    fromSchema.addProp("prop2", "val2");
+    List<Schema.Field> fieldList = Lists.newArrayList();
+    Schema.Field field1 =
+        AvroCompatibilityHelper.createSchemaField("key", Schema.create(Schema.Type.LONG), "", 0L);
+    fieldList.add(field1);
+    Schema.Field field2 =
+        AvroCompatibilityHelper.createSchemaField("double", Schema.create(Schema.Type.DOUBLE), "", 0.0);
+    fieldList.add(field2);
+
+    fromSchema.setFields(Lists.newArrayList(fieldList));
+
+    Schema toSchema = readSchemaFromJsonFile("props/schema_without_props.json");
+    AvroSchemaUtils.copySchemaProperties(fromSchema, toSchema);
+
+    
+    Assert.assertEquals(fromSchema.toString(), toSchema.toString());
+    for(Schema.Field field : toSchema.getFields()) {
+      Schema.Field oldField = fromSchema.getField(field.name());
+      Assert.assertEquals(field.toString(), oldField.toString());
+    }
+
+    Assert.assertTrue(fromSchema.getObjectProps().equals(toSchema.getObjectProps()));
+  }
+
+  @Test
+  public void testCopySchemaPropertiesWithAdditionalProps() throws IOException {
+
+    Schema fromSchema = readSchemaFromJsonFile("props/schema_with_props.json");
+    fromSchema.addProp("prop3", "val3");
+    fromSchema.addProp("prop4", "val4");
+
+    Schema toSchema = readSchemaFromJsonFile("props/schema_without_props.json");
+    AvroSchemaUtils.copySchemaProperties(fromSchema, toSchema);
+
+
+    Assert.assertEquals(fromSchema.toString(), toSchema.toString());
+    for(Schema.Field field : toSchema.getFields()) {
+      Schema.Field oldField = fromSchema.getField(field.name());
+      Assert.assertEquals(field.toString(), oldField.toString());
+    }
+
+    Assert.assertTrue(fromSchema.getObjectProps().equals(toSchema.getObjectProps()));
+  }
+
+  @Test
+  public void testCopyFieldProperties() throws IOException {
+
+    Schema fromSchema = Schema.createRecord("name", "", "namespace", false);
+    fromSchema.addProp("prop1", "val1");
+    fromSchema.addProp("prop2", "val2");
+    List<Schema.Field> fieldList = Lists.newArrayList();
+    Schema.Field field1 =
+        AvroCompatibilityHelper.createSchemaField("key", Schema.create(Schema.Type.LONG), "", 0L);
+    field1.addProp("primaryKey", "true");
+    fieldList.add(field1);
+    Schema.Field field2 =
+        AvroCompatibilityHelper.createSchemaField("double", Schema.create(Schema.Type.DOUBLE), "", 0.0);
+    fieldList.add(field2);
+
+    fromSchema.setFields(Lists.newArrayList(fieldList));
+
+    Schema toSchema = readSchemaFromJsonFile("props/schema_without_field_props.json");
+    AvroSchemaUtils.copyFieldProperties(fromSchema.getField("key"), toSchema.getField("key"));
+
+
+    Assert.assertEquals(fromSchema.toString(), toSchema.toString());
+    for(Schema.Field field : toSchema.getFields()) {
+      Schema.Field oldField = fromSchema.getField(field.name());
+      Assert.assertEquals(field.toString(), oldField.toString());
+    }
+
+    Assert.assertTrue(fromSchema.getObjectProps().equals(toSchema.getObjectProps()));
+  }
+
+  @Test
+  public void testCopyFieldPropertiesWithAdditionalProps() throws IOException {
+
+    Schema fromSchema = readSchemaFromJsonFile("props/schema_with_field_props.json");
+    Schema.Field keyField = fromSchema.getField("key");
+    keyField.addProp("key1", "val1");
+    Schema.Field doubleField = fromSchema.getField("double");
+    doubleField.addProp("key1", "val1");
+    doubleField.addProp("key2", "val2");
+
+    Schema toSchema = readSchemaFromJsonFile("props/schema_without_field_props.json");
+    Schema.Field toKeyField = toSchema.getField("key");
+    Schema.Field toDoubleField = toSchema.getField("double");
+    AvroSchemaUtils.copyFieldProperties(keyField, toKeyField);
+    AvroSchemaUtils.copyFieldProperties(doubleField, toDoubleField);
+
+    Assert.assertEquals(fromSchema.toString(), toSchema.toString());
+    for(Schema.Field field : toSchema.getFields()) {
+      Schema.Field oldField = fromSchema.getField(field.name());
+      Assert.assertEquals(field.toString(), oldField.toString());
+    }
+
+    Assert.assertTrue(fromSchema.getObjectProps().equals(toSchema.getObjectProps()));
+  }
+
+  private static Schema readSchemaFromJsonFile(String filename)
+      throws IOException {
+    return new Schema.Parser()
+        .parse(AvroSchemaUtilsTest.class.getClassLoader().getResourceAsStream(filename));
+  }
+
+}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 11274afe3..c60bc7db2 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.util;
 
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -45,12 +46,13 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.internal.JacksonUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -100,7 +102,7 @@ public class AvroUtilsTest {
 
     Schema expectedOutputSchema1 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " + "\"fields\":["
-            + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": \"number\", \"type\": [\"null\", \"int\"]}"
+            + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": \"number\", \"type\": [\"null\", \"int\"],\"default\":null}]}"
             + "]}");
 
     Assert.assertEquals(expectedOutputSchema1, AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
@@ -117,7 +119,7 @@ public class AvroUtilsTest {
     Schema expectedOutputSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " + "\"fields\":["
             + "{\"name\": \"name\", \"type\": \"string\"}, "
-            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}" + "]}");
+            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}],\"default\":null}]}" + "]}");
 
     Assert.assertEquals(expectedOutputSchema2, AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
   }
@@ -145,10 +147,10 @@ public class AvroUtilsTest {
             .parse("{\"type\":\"record\", \"name\":\"test\", "
                 + "\"fields\":["
                 + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\": null}]}"
                 + "]}");
 
-    Assert.assertEquals(expectedOutputSchema1, AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
+    Assert.assertEquals(expectedOutputSchema1.toString(), AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1).toString());
 
     Schema oldSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " + "\"fields\":["
@@ -162,9 +164,9 @@ public class AvroUtilsTest {
     Schema expectedOutputSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " + "\"fields\":["
             + "{\"name\": \"name\", \"type\": \"string\"}, "
-            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}" + "]}");
+            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}], \"default\": null}" + "]}");
 
-    Assert.assertEquals(expectedOutputSchema2, AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
+    Assert.assertEquals(expectedOutputSchema2.toString(), AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2).toString());
   }
 
   /**
@@ -189,10 +191,10 @@ public class AvroUtilsTest {
             .parse("{\"type\":\"record\", \"name\":\"test\", "
                 + "\"fields\":["
                 + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"color\", \"type\": [\"null\", \"string\"]}, "
-                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+                + "{\"name\": \"color\", \"type\": [\"null\", \"string\"], \"default\": null}, "
+                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": \"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\": null}]}"
                 + "]}");
-    Assert.assertEquals(expectedOutputSchema, AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema));
+    Assert.assertEquals(expectedOutputSchema.toString(), AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema).toString());
   }
 
   /**
@@ -242,10 +244,11 @@ public class AvroUtilsTest {
     schema.addProp("prop2", "val2");
     List<Schema.Field> fieldList = Lists.newArrayList();
     Schema.Field field1 =
-        new Schema.Field("key", Schema.create(Schema.Type.LONG), "", 0L);
+        AvroCompatibilityHelper.createSchemaField("key", Schema.create(Schema.Type.LONG), "", 0L);
     field1.addProp("primaryKey", "true");
     fieldList.add(field1);
-    Schema.Field field2 = new Schema.Field("double", Schema.create(Schema.Type.DOUBLE), "", 0.0);
+    Schema.Field field2 = 
+        AvroCompatibilityHelper.createSchemaField("double", Schema.create(Schema.Type.DOUBLE), "", 0.0);
     fieldList.add(field2);
 
     schema.setFields(Lists.newArrayList(fieldList));
@@ -256,8 +259,14 @@ public class AvroUtilsTest {
 
     Assert.assertEquals(newSchema.getNamespace(), newNamespace);
     Assert.assertEquals(newSchema.getName(), originalName);
+    System.out.println("newSchema: " + newSchema);
     for(Schema.Field field : newSchema.getFields()) {
-      Assert.assertEquals(field, schema.getField(field.name()));
+      Schema.Field oldField = schema.getField(field.name());
+      System.out.println("OldField: " + oldField);
+      System.out.println("Field: " + field);
+      Boolean bool = oldField.equals(field);
+      System.out.println("Equal: " + bool);
+      Assert.assertEquals(field.toString(), oldField.toString());
     }
 
     Assert.assertTrue(schema.getObjectProps().equals(newSchema.getObjectProps()));
@@ -620,10 +629,13 @@ public class AvroUtilsTest {
           .getResourceAsStream("recursive_schemas/recursive_" + scenario + "_solution.avsc"));
 
       // get the answer from the input schema (test author needs to provide this)
-      ArrayNode foo = (ArrayNode) inputSchema.getJsonProp("recursive_fields");
+      // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+      // Use internal JacksonUtils to convert object to the corresponding JsonNode (ArrayNode)
+      ArrayNode foo = (ArrayNode) JacksonUtils.toJsonNode(inputSchema.getObjectProp(
+          "recursive_fields"));
       HashSet<String> answers = new HashSet<>();
       for (JsonNode fieldsWithRecursion: foo) {
-        answers.add(fieldsWithRecursion.getTextValue());
+        answers.add(fieldsWithRecursion.asText());
       }
 
       Pair<Schema, List<AvroUtils.SchemaEntry>> results = AvroUtils.dropRecursiveFields(inputSchema);
diff --git a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
index dc4e37dc3..c2dd3a6fc 100644
--- a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
+++ b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
@@ -4,15 +4,18 @@
   "fields" : [ {
     "name" : "parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString1",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1",
+    "default": null
   }, {
     "name" : "parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString2",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2",
+    "default": null
   }, {
     "name" : "parentFieldUnion__unionRecordMemberFieldString",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+    "default": null
   }, {
     "name" : "parentFieldInt",
     "type" : "int"
diff --git a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
index f6ca3cf09..9b6c8b9ca 100644
--- a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
+++ b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
@@ -4,11 +4,13 @@
   "fields" : [ {
     "name" : "parentFieldUnion__unionRecordMemberFieldLong",
     "type" : [ "null", "long" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong",
+    "default": null
   }, {
     "name" : "parentFieldUnion__unionRecordMemberFieldString",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+    "default": null
   }, {
     "name" : "parentFieldInt",
     "type" : "int"
diff --git a/gobblin-utility/src/test/resources/props/schema_with_field_props.json b/gobblin-utility/src/test/resources/props/schema_with_field_props.json
new file mode 100644
index 000000000..c57adf639
--- /dev/null
+++ b/gobblin-utility/src/test/resources/props/schema_with_field_props.json
@@ -0,0 +1,23 @@
+{
+  "type": "record",
+  "name": "name",
+  "namespace": "namespace",
+  "doc": "",
+  "fields": [
+    {
+      "name": "key",
+      "type": "long",
+      "doc": "",
+      "default": 0,
+      "primaryKey": true
+    },
+    {
+      "name": "double",
+      "type": "double",
+      "doc": "",
+      "default": 0.0
+    }
+  ],
+  "prop1": "val1",
+  "prop2": "val2"
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/resources/props/schema_with_logical_field.json b/gobblin-utility/src/test/resources/props/schema_with_logical_field.json
new file mode 100644
index 000000000..e382771f9
--- /dev/null
+++ b/gobblin-utility/src/test/resources/props/schema_with_logical_field.json
@@ -0,0 +1,34 @@
+{
+  "type": "record",
+  "name": "SchemaWithLogicalType",
+  "fields": [
+    {
+      "name": "fieldInt",
+      "type": "int"
+    },
+    {
+      "name": "logicalFieldVarchar",
+      "type": {
+        "type": "string",
+        "logicalType": "varchar",
+        "maxLength": "256"
+      }
+    },
+    {
+      "name": "logicalFieldDecimal",
+      "type": {
+        "type": "bytes",
+        "logicalType": "decimal",
+        "precision": 4,
+        "scale": 2
+      }
+    },
+    {
+      "name": "logicalFieldDate",
+      "type": {
+        "type": "int",
+        "logicalType": "date"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/resources/props/schema_with_props.json b/gobblin-utility/src/test/resources/props/schema_with_props.json
new file mode 100644
index 000000000..1fc14852d
--- /dev/null
+++ b/gobblin-utility/src/test/resources/props/schema_with_props.json
@@ -0,0 +1,22 @@
+{
+  "type": "record",
+  "name": "name",
+  "namespace": "namespace",
+  "doc": "",
+  "fields": [
+    {
+      "name": "key",
+      "type": "long",
+      "doc": "",
+      "default": 0
+    },
+    {
+      "name": "double",
+      "type": "double",
+      "doc": "",
+      "default": 0.0
+    }
+  ],
+  "prop1": "val1",
+  "prop2": "val2"
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/resources/props/schema_without_field_props.json b/gobblin-utility/src/test/resources/props/schema_without_field_props.json
new file mode 100644
index 000000000..1fc14852d
--- /dev/null
+++ b/gobblin-utility/src/test/resources/props/schema_without_field_props.json
@@ -0,0 +1,22 @@
+{
+  "type": "record",
+  "name": "name",
+  "namespace": "namespace",
+  "doc": "",
+  "fields": [
+    {
+      "name": "key",
+      "type": "long",
+      "doc": "",
+      "default": 0
+    },
+    {
+      "name": "double",
+      "type": "double",
+      "doc": "",
+      "default": 0.0
+    }
+  ],
+  "prop1": "val1",
+  "prop2": "val2"
+}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/resources/props/schema_without_props.json b/gobblin-utility/src/test/resources/props/schema_without_props.json
new file mode 100644
index 000000000..4384975b6
--- /dev/null
+++ b/gobblin-utility/src/test/resources/props/schema_without_props.json
@@ -0,0 +1,20 @@
+{
+  "type": "record",
+  "name": "name",
+  "namespace": "namespace",
+  "doc": "",
+  "fields": [
+    {
+      "name": "key",
+      "type": "long",
+      "doc": "",
+      "default": 0
+    },
+    {
+      "name": "double",
+      "type": "double",
+      "doc": "",
+      "default": 0.0
+    }
+  ]
+}
\ No newline at end of file
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 820350213..9f83dee47 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -100,6 +100,7 @@ shadowJar {
     exclude dependency('org.mockito:.*')
     exclude dependency('org.datanucleus:.*')
     exclude dependency('org.apache.hive:.*')
+    exclude dependency('com.linkedin.hive:.*')
     exclude dependency('org.scala-lang:scala-library:.*')
     exclude dependency('org.apache.derby:.*')
   }
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 2d922168f..db90d1091 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -23,14 +23,14 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
     .register(new BuildProperty("sonatypeArtifactSnapshotRepository", "https://oss.sonatype.org/content/repositories/snapshots/", "Maven repository to publish artifacts"))
     .register(new BuildProperty("nexusArtifactRepository", "https://repository.apache.org/service/local/staging/deploy/maven2", "Maven repository to publish artifacts"))
     .register(new BuildProperty("nexusArtifactSnapshotRepository", "https://repository.apache.org/content/repositories/snapshots", "Maven repository to publish artifacts"))
-    .register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies version"))
+    .register(new BuildProperty("avroVersion", "1.9.2", "Avro dependencies version"))
     .register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies version"))
     .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman dependencies version"))
     .register(new BuildProperty("confluentVersion", "2.0.1", "confluent dependencies version"))
     .register(new BuildProperty("doNotSignArtifacts", false, "Do not sight Maven artifacts"))
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor (see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
     .register(new BuildProperty("hadoopVersion", "2.3.0", "Hadoop dependencies version"))
-    .register(new BuildProperty("hiveVersion", "1.0.1", "Hive dependencies version"))
+    .register(new BuildProperty("hiveVersion", "1.0.1-avro", "Hive dependencies version"))
     .register(new BuildProperty("icebergVersion", "0.11.1", "Iceberg dependencies version"))
     .register(new BuildProperty("jdkVersion", JavaVersion.VERSION_1_8.toString(),
     "Java languange compatibility; supported versions: " + JavaVersion.VERSION_1_8))
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 3bda269ab..1ffbae691 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -26,6 +26,8 @@ ext.externalDependency = [
     "assertj": "org.assertj:assertj-core:3.20.2",
     "avro": "org.apache.avro:avro:" + avroVersion,
     "avroMapredH2": "org.apache.avro:avro-mapred:" + avroVersion,
+    "avroCompatHelper": "com.linkedin.avroutil1:helper-all:0.2.117",
+    "avroCompiler": "org.apache.avro:avro-compiler:" + avroVersion,
     "awsCore": "com.amazonaws:aws-java-sdk-core:" + awsVersion,
     "awsAsg": "com.amazonaws:aws-java-sdk-autoscaling:" + awsVersion,
     "awsAppAsg": "com.amazonaws:aws-java-sdk-applicationautoscaling:" + awsVersion,
@@ -68,12 +70,12 @@ ext.externalDependency = [
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
     "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
     "helix": "org.apache.helix:helix-core:1.0.2",
-    "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
-    "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
-    "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,
-    "hiveMetastore": "org.apache.hive:hive-metastore:" + hiveVersion,
-    "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
-    "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
+    "hiveCommon": "com.linkedin.hive:hive-common:" + hiveVersion,
+    "hiveService": "com.linkedin.hive:hive-service:" + hiveVersion,
+    "hiveJdbc": "com.linkedin.hive:hive-jdbc:" + hiveVersion,
+    "hiveMetastore": "com.linkedin.hive:hive-metastore:" + hiveVersion,
+    "hiveExec": "com.linkedin.hive:hive-exec:" + hiveVersion + ":core",
+    "hiveSerDe": "com.linkedin.hive:hive-serde:" + hiveVersion,
     "hiveStorageApi": "org.apache.hive:hive-storage-api:2.4.0",
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
     "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
@@ -103,7 +105,8 @@ ext.externalDependency = [
     "junit": "junit:junit:4.13.2",
     "mockserver":"org.mock-server:mockserver-netty:3.10.4",
     "jacksonCore": "org.codehaus.jackson:jackson-core-asl:1.9.13",
-    "jacksonMapper": "org.codehaus.jackson:jackson-mapper-asl:1.9.13",
+    "jacksonMapperAsl": "org.codehaus.jackson:jackson-mapper-asl:1.9.13",
+    "jacksonMapper": "com.fasterxml.jackson.core:jackson-databind:2.10.2",
     "jasypt": "org.jasypt:jasypt:1.9.2",
     "jodaTime": "joda-time:joda-time:2.9.3",
     "jgrapht": "org.jgrapht:jgrapht-core:0.9.2",
diff --git a/gradle/scripts/repositories.gradle b/gradle/scripts/repositories.gradle
index 599906ee6..90657222b 100644
--- a/gradle/scripts/repositories.gradle
+++ b/gradle/scripts/repositories.gradle
@@ -29,7 +29,10 @@ repositories {
   maven {
     url "https://linkedin.jfrog.io/artifactory/open-source/"
   }
-  mavenCentral()
+  maven {
+    url "https://linkedin.jfrog.io/artifactory/gobblin-hive/"
+  }
+  mavenLocal()
 }
 
 try {