You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/17 21:36:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1116] Avoid registering schema with schema registry during Me…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bedefc  [GOBBLIN-1116] Avoid registering schema with schema registry during Me…
4bedefc is described below

commit 4bedefc13b6ba36a4f0169bbae3aae18b1ceaa39
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 17 14:36:36 2020 -0700

    [GOBBLIN-1116] Avoid registering schema with schema registry during Me…
    
    Closes #2956 from sv2000/metricsReporterFailure
---
 .../gobblin/configuration/ConfigurationKeys.java   |   7 +-
 .../gobblin/metrics/reporter/util/EventUtils.java  |  34 ++++---
 .../metrics/reporter/util/MetricReportUtils.java   |  59 ++++++++++--
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |  52 ++--------
 .../azkaban/AzkabanGobblinYarnAppLauncherTest.java |  61 ------------
 ...fkaAvroEventReporterWithSchemaRegistryTest.java |  78 +++++++++------
 .../gobblin/converter/EnvelopeSchemaConverter.java |  27 +++---
 .../kafka/schemareg/LiKafkaSchemaRegistry.java     |   6 +-
 .../gobblin/metrics/KafkaReportingFormats.java     |  19 +++-
 .../kafka/KafkaAvroEventKeyValueReporter.java      |  32 ++++---
 .../metrics/kafka/KafkaAvroEventReporter.java      |  17 +++-
 .../gobblin/metrics/kafka/KafkaAvroReporter.java   |  17 +++-
 .../metrics/kafka/KafkaAvroSchemaRegistry.java     |   4 +-
 .../metrics/kafka/KafkaReporterFactory.java        |  29 +++---
 .../metrics/kafka/SchemaRegistryException.java     |   5 +-
 ...roReporterUtil.java => KafkaReporterUtils.java} |  55 ++++++++++-
 .../reporter/util/SchemaRegistryVersionWriter.java |  55 ++++++-----
 .../KafkaAvroEventKeyValueReporterTest.java        | 103 +++++++++++++-------
 .../reporter/KafkaAvroEventReporterTest.java       |   2 +-
 .../KafkaAvroReporterWithSchemaRegistryTest.java   | 105 +++++++++++++++++++++
 .../reporter/KeyValueEventObjectReporterTest.java  |   4 +-
 .../reporter/KeyValueMetricObjectReporterTest.java |   4 +-
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   3 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  90 +++++++++++++++++-
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   |  47 +++++++++
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  26 ++++-
 26 files changed, 651 insertions(+), 290 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 53bbe3c..5cb2613 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -716,11 +716,16 @@ public class ConfigurationKeys {
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
   public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.avro.use.schema.registry";
+  public static final String METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.avro.schemaId";
+  public static final String METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.metrics.kafka.avro.schemaId";
+
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY = Boolean.toString(false);
   public static final String METRICS_KAFKA_BROKERS = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.brokers";
   // Topic used for both event and metric reporting.
   // Can be overriden by METRICS_KAFKA_TOPIC_METRICS and METRICS_KAFKA_TOPIC_EVENTS.
-  public static final String METRICS_KAFKA_TOPIC = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic";
+  public static final String METRICS_KAFKA_TOPIC = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.common";
   // Topic used only for metric reporting.
   public static final String METRICS_KAFKA_TOPIC_METRICS =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.metrics";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
index af2869b..4bb48cb 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
@@ -28,6 +28,8 @@ import org.apache.avro.specific.SpecificDatumReader;
 import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 
 
@@ -75,13 +77,26 @@ public class EventUtils {
   }
 
   /**
-   * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array Avro serialization.
-   * @param reuse MetricReport to reuse.
+   * Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
+   * @param reuse GobblinTrackingEvent to reuse.
    * @param bytes Input bytes.
-   * @return MetricReport.
+   * @return GobblinTrackingEvent.
    * @throws java.io.IOException
    */
-  public synchronized static GobblinTrackingEvent deserializeReportFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes)
+  public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes)
+      throws IOException {
+    return deserializeEventFromAvroSerialization(reuse, bytes, null);
+  }
+
+  /**
+   * Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
+   * @param reuse GobblinTrackingEvent to reuse.
+   * @param bytes Input bytes.
+   * @param schemaId Expected schemaId.
+   * @return GobblinTrackingEvent.
+   * @throws java.io.IOException
+   */
+  public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes, @Nullable String schemaId)
       throws IOException {
     if (!reader.isPresent()) {
       reader = Optional.of(new SpecificDatumReader<>(GobblinTrackingEvent.class));
@@ -92,12 +107,10 @@ public class EventUtils {
     try {
       DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
 
-      // Check version byte
-      int versionNumber = inputStream.readInt();
-      if (versionNumber != SCHEMA_VERSION) {
-        throw new IOException(String
-            .format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
-                SCHEMA_VERSION));
+      if (schemaId != null) {
+        MetricReportUtils.readAndVerifySchemaId(inputStream, schemaId);
+      } else {
+        MetricReportUtils.readAndVerifySchemaVersion(inputStream);
       }
 
       // Decode the rest
@@ -110,4 +123,3 @@ public class EventUtils {
     }
   }
 }
-
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 2d49606..b51dcc6 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -24,10 +24,13 @@ import java.io.IOException;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.codec.binary.Hex;
 
 import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.metrics.MetricReport;
 
 
@@ -64,8 +67,8 @@ public class MetricReportUtils {
       // Check version byte
       int versionNumber = inputStream.readInt();
       if (versionNumber != SCHEMA_VERSION) {
-        throw new IOException(
-            String.format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
+        throw new IOException(String
+            .format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
                 SCHEMA_VERSION));
       }
 
@@ -88,6 +91,20 @@ public class MetricReportUtils {
    */
   public synchronized static MetricReport deserializeReportFromAvroSerialization(MetricReport reuse, byte[] bytes)
       throws IOException {
+    return deserializeReportFromAvroSerialization(reuse, bytes, null);
+  }
+
+  /**
+   * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array Avro serialization.
+   * @param reuse MetricReport to reuse.
+   * @param bytes Input bytes.
+   * @param schemaId Expected schemaId.
+   * @return MetricReport.
+   * @throws java.io.IOException
+   */
+  public synchronized static MetricReport deserializeReportFromAvroSerialization(MetricReport reuse, byte[] bytes,
+      @Nullable String schemaId)
+      throws IOException {
     if (!READER.isPresent()) {
       READER = Optional.of(new SpecificDatumReader<>(MetricReport.class));
     }
@@ -96,15 +113,11 @@ public class MetricReportUtils {
 
     try {
       DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
-
-      // Check version byte
-      int versionNumber = inputStream.readInt();
-      if (versionNumber != SCHEMA_VERSION) {
-        throw new IOException(
-            String.format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
-                SCHEMA_VERSION));
+      if (schemaId != null) {
+        readAndVerifySchemaId(inputStream, schemaId);
+      } else {
+        readAndVerifySchemaVersion(inputStream);
       }
-
       // Decode the rest
       Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
       return READER.get().read(reuse, decoder);
@@ -114,4 +127,30 @@ public class MetricReportUtils {
       closer.close();
     }
   }
+
+  public static void readAndVerifySchemaId(DataInputStream inputStream, String schemaId)
+      throws IOException {
+    //Read the magic byte
+    inputStream.readByte();
+    int schemaIdLengthBytes = schemaId.length() / 2;
+    byte[] readId = new byte[schemaIdLengthBytes];
+    int numBytesRead = inputStream.read(readId, 0, schemaIdLengthBytes);
+    String readSchemaId = Hex.encodeHexString(readId);
+    if (numBytesRead != schemaIdLengthBytes || !schemaId.equals(readSchemaId)) {
+      throw new IOException(String
+          .format("Schema version not recognized. Found version %s, expected %s.", readSchemaId,
+              schemaId));
+    }
+  }
+
+  public static void readAndVerifySchemaVersion(DataInputStream inputStream)
+      throws IOException {
+    // Check version byte
+    int versionNumber = inputStream.readInt();
+    if (versionNumber != SCHEMA_VERSION) {
+      throw new IOException(String
+          .format("Schema version not recognized. Found version %d, expected %d.", versionNumber,
+              SCHEMA_VERSION));
+    }
+  }
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 4747e89..97aa15a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -17,26 +17,22 @@
 
 package org.apache.gobblin.azkaban;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.Logger;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
 
 import azkaban.jobExecutor.AbstractJob;
 import lombok.Getter;
 
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
+
 
 /**
  * A utility class for launching a Gobblin application on Yarn through Azkaban.
@@ -54,9 +50,6 @@ import lombok.Getter;
  * @author Yinan Li
  */
 public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
-  // if this is set then the Azkaban config will be written to the specified file path
-  public static final String AZKABAN_CONFIG_OUTPUT_PATH = "gobblin.yarn.akabanConfigOutputPath";
-
   private static final Logger LOGGER = Logger.getLogger(AzkabanJobLauncher.class);
 
   private final GobblinYarnAppLauncher gobblinYarnAppLauncher;
@@ -64,12 +57,11 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
   @Getter
   private final YarnConfiguration yarnConfiguration;
 
-  public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) throws IOException {
+  public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
+      throws IOException, SchemaRegistryException {
     super(jobId, LOGGER);
     Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
 
-    outputConfigToFile(gobblinConfig);
-
     yarnConfiguration = initYarnConf(gobblinProps);
 
     this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
@@ -110,36 +102,4 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
       super.cancel();
     }
   }
-
-  /**
-   * Write the config to the file specified with the config key {@value AZKABAN_CONFIG_OUTPUT_PATH} if it
-   * is configured.
-   * @param config the config to output
-   * @throws IOException
-   */
-  @VisibleForTesting
-  static void outputConfigToFile(Config config) throws IOException {
-    // If a file path is specified then write the Azkaban config to that path in HOCON format.
-    // This can be used to generate an application.conf file to pass to the yarn app master and containers.
-    if (config.hasPath(AZKABAN_CONFIG_OUTPUT_PATH)) {
-      File configFile = new File(config.getString(AZKABAN_CONFIG_OUTPUT_PATH));
-      File parentDir = configFile.getParentFile();
-
-      if (parentDir != null && !parentDir.exists()) {
-        if (!parentDir.mkdirs()) {
-          throw new IOException("Error creating directories for " + parentDir);
-        }
-      }
-
-      ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
-      configRenderOptions = configRenderOptions.setComments(false);
-      configRenderOptions = configRenderOptions.setOriginComments(false);
-      configRenderOptions = configRenderOptions.setFormatted(true);
-      configRenderOptions = configRenderOptions.setJson(false);
-
-      String renderedConfig = config.root().render(configRenderOptions);
-
-      FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
-    }
-  }
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java
deleted file mode 100644
index ccb49c0..0000000
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.azkaban;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-
-@Slf4j
-public class AzkabanGobblinYarnAppLauncherTest {
-
-  @Test
-  public void testOutputConfig() throws IOException {
-    File tmpTestDir = Files.createTempDir();
-
-    try {
-      Path outputPath = Paths.get(tmpTestDir.toString(), "application.conf");
-      Config config = ConfigFactory.empty()
-          .withValue(ConfigurationKeys.FS_URI_KEY, ConfigValueFactory.fromAnyRef("file:///"))
-          .withValue(AzkabanGobblinYarnAppLauncher.AZKABAN_CONFIG_OUTPUT_PATH,
-              ConfigValueFactory.fromAnyRef(outputPath.toString()));
-
-      AzkabanGobblinYarnAppLauncher.outputConfigToFile(config);
-
-      String configString = Files.toString(outputPath.toFile(), Charsets.UTF_8);
-      Assert.assertTrue(configString.contains("fs"));
-    } finally {
-      FileUtils.deleteDirectory(tmpTestDir);
-    }
-  }
-}
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
index 9d418da..e65e4bf 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.metrics.reporter;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.avro.Schema;
@@ -37,7 +38,6 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 
 
 public class KafkaAvroEventReporterWithSchemaRegistryTest {
@@ -46,28 +46,51 @@ public class KafkaAvroEventReporterWithSchemaRegistryTest {
 
   @Test
   public void test() throws Exception {
+    testHelper(false);
+  }
+
+  @Test
+  public void testWithSchemaId() throws IOException {
+    testHelper(true);
+  }
+
+  private String register(Schema schema) {
+    String id = DigestUtils.sha1Hex(schema.toString().getBytes());
+    this.schemas.put(id, schema);
+    return id;
+  }
 
+  private void testHelper(boolean isSchemaIdEnabled) throws IOException {
     MetricContext context = MetricContext.builder("context").build();
 
     MockKafkaPusher pusher = new MockKafkaPusher();
     KafkaAvroSchemaRegistry registry = Mockito.mock(KafkaAvroSchemaRegistry.class);
-    Mockito.when(registry.register(Mockito.any(Schema.class))).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocation)
-          throws Throwable {
-        return register((Schema) invocation.getArguments()[0]);
-      }
-    });
-    Mockito.when(registry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
-      @Override
-      public String answer(InvocationOnMock invocation)
-          throws Throwable {
-        return register((Schema) invocation.getArguments()[0]);
-      }
-    });
-    KafkaEventReporter kafkaReporter =
-        KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher)
-            .withSchemaRegistry(registry).build("localhost:0000", "topic");
+    KafkaAvroEventReporter.Builder builder = KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher).withSchemaRegistry(registry);
+
+    Schema schema =
+        new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+    String schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+
+    if (!isSchemaIdEnabled) {
+      Mockito.when(registry.register(Mockito.any(Schema.class))).thenAnswer(new Answer<String>() {
+        @Override
+        public String answer(InvocationOnMock invocation)
+            throws Throwable {
+          return register((Schema) invocation.getArguments()[0]);
+        }
+      });
+      Mockito.when(registry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
+        @Override
+        public String answer(InvocationOnMock invocation)
+            throws Throwable {
+          return register((Schema) invocation.getArguments()[0]);
+        }
+      });
+    } else {
+      builder.withSchemaId(schemaId);
+    }
+
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic");
 
     GobblinTrackingEvent event = new GobblinTrackingEvent(0l, "namespace", "name", Maps.<String, String>newHashMap());
 
@@ -93,19 +116,12 @@ public class KafkaAvroEventReporterWithSchemaRegistryTest {
     byte[] readId = new byte[20];
     Assert.assertEquals(is.read(readId), 20);
     String readStringId = Hex.encodeHexString(readId);
-    Assert.assertTrue(this.schemas.containsKey(readStringId));
-
-    Schema schema = this.schemas.get(readStringId);
-    Assert.assertFalse(schema.toString().contains("avro.java.string"));
-
+    if (!isSchemaIdEnabled) {
+      Assert.assertTrue(this.schemas.containsKey(readStringId));
+      Schema readSchema = this.schemas.get(readStringId);
+      Assert.assertFalse(readSchema.toString().contains("avro.java.string"));
+    }
+    Assert.assertEquals(readStringId, schemaId);
     is.close();
   }
-
-  private String register(Schema schema)
-      throws SchemaRegistryException {
-    String id = DigestUtils.sha1Hex(schema.toString().getBytes());
-    this.schemas.put(id, schema);
-    return id;
-  }
-
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
index 124fe4c..2caceb6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
@@ -17,28 +17,31 @@
 
 package org.apache.gobblin.converter;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+
+import javax.xml.bind.DatatypeConverter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.filter.AvroProjectionConverter;
 import org.apache.gobblin.converter.filter.AvroSchemaFieldRemover;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistryFactory;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.util.AvroUtils;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutionException;
-import javax.xml.bind.DatatypeConverter;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
 
 /**
  * A converter for extracting schema/records from an envelope schema.
@@ -123,7 +126,7 @@ public class EnvelopeSchemaConverter extends Converter<Schema, String, GenericRe
         payloadSchema = this.fieldRemover.get().removeFields(payloadSchema);
       }
       return new SingleRecordIterable<>(AvroUtils.convertRecordSchema(outputRecord, payloadSchema));
-    } catch (IOException | SchemaRegistryException | ExecutionException e) {
+    } catch (IOException | ExecutionException e) {
       throw new DataConversionException(e);
     }
   }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
index d046747..0e00f04 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
@@ -30,8 +30,6 @@ import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
-import org.apache.gobblin.util.PropertiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +38,9 @@ import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.serialize.MD5Digest;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -70,7 +70,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch
         String.format("Property %s not provided.", KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL));
 
     this.url = props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
-    this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
+    this.namespaceOverride = KafkaReporterUtils.extractOverrideNamespace(props);
     this.switchTopicNames = PropertiesUtils.getPropAsBoolean(props, KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME,
         KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT);
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index eb409ff..094bb83 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 
 import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -35,7 +36,7 @@ import org.apache.gobblin.metrics.reporter.KeyValueEventObjectReporter;
 import org.apache.gobblin.metrics.reporter.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
 import org.apache.gobblin.metrics.kafka.PusherUtils;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -53,6 +54,10 @@ public enum KafkaReportingFormats {
       if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
           ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
         builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+        String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID);
+        if (!Strings.isNullOrEmpty(schemaId)) {
+          builder.withSchemaId(schemaId);
+        }
       }
       builder.build(brokers, topic, properties);
     }
@@ -66,6 +71,10 @@ public enum KafkaReportingFormats {
       if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
           ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
         builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+        String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID);
+        if (!Strings.isNullOrEmpty(schemaId)) {
+          builder.withSchemaId(schemaId);
+        }
       }
       String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? properties
           .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : properties
@@ -104,6 +113,10 @@ public enum KafkaReportingFormats {
       if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
           ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
         builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+        String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID);
+        if (!Strings.isNullOrEmpty(schemaId)) {
+          builder.withSchemaId(schemaId);
+        }
       }
       String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? properties
           .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : properties
@@ -155,7 +168,7 @@ public enum KafkaReportingFormats {
         throws IOException {
 
       KeyValueMetricObjectReporter.Builder builder = new KeyValueMetricObjectReporter.Builder();
-      builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(properties));
       Config allConfig = ConfigUtils.propertiesToConfig(properties);
       Config config = ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX)
           .withFallback(allConfig);
@@ -173,7 +186,7 @@ public enum KafkaReportingFormats {
           ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX)
               .withFallback(allConfig);
       builder.withConfig(config);
-      builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(properties));
       return builder.build(brokers, topic);
     }
   };
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
index bf6a07e..2d447c2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
@@ -17,23 +17,22 @@
 
 package org.apache.gobblin.metrics.kafka;
 
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
 import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
 
 /**
  * Implement of {@link KafkaEventKeyValueReporter} for avro records.
@@ -44,10 +43,11 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
   protected KafkaAvroEventKeyValueReporter(Builder<?> builder) throws IOException {
     super(builder);
     if(builder.registry.isPresent()) {
-      Schema schema =
-          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
-      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
-          Optional.of(schema)));
+      Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+      SchemaRegistryVersionWriter schemaVersionWriter =
+          builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+              builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+      this.serializer.setSchemaVersionWriter(schemaVersionWriter);
     }
   }
 
@@ -86,6 +86,7 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
    */
   public static abstract class Builder<T extends Builder<T>> extends KafkaEventKeyValueReporter.Builder<T> {
     private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+    private Optional<String> schemaId = Optional.absent();
 
     private Builder(MetricContext context) {
       super(context);
@@ -96,6 +97,11 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
       return self();
     }
 
+    public T withSchemaId(String schemaId) {
+      this.schemaId = Optional.of(schemaId);
+      return self();
+    }
+
     /**
      * Builds and returns {@link KafkaAvroEventReporter}.
      *
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
index ecb5d7d..dd1297b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
 import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 
@@ -39,10 +40,11 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
   protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
     super(builder);
     if(builder.registry.isPresent()) {
-      Schema schema =
-          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
-      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
-          Optional.of(schema)));
+      Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+      SchemaRegistryVersionWriter schemaVersionWriter =
+          builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+              builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+      this.serializer.setSchemaVersionWriter(schemaVersionWriter);
     }
   }
 
@@ -92,8 +94,8 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
    * Defaults to no filter, reporting rates in seconds and times in milliseconds.
    */
   public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
-
     private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+    private Optional<String> schemaId = Optional.absent();
 
     private Builder(MetricContext context) {
       super(context);
@@ -104,6 +106,11 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
       return self();
     }
 
+    public T withSchemaId(String schemaId) {
+      this.schemaId = Optional.of(schemaId);
+      return self();
+    }
+
     /**
      * Builds and returns {@link KafkaAvroEventReporter}.
      *
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
index 5cb3ce8..6a4396e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -28,6 +28,7 @@ import com.typesafe.config.Config;
 import org.apache.gobblin.metrics.MetricReport;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
 import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 
@@ -42,10 +43,11 @@ public class KafkaAvroReporter extends KafkaReporter {
   protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
     super(builder, config);
     if (builder.registry.isPresent()) {
-      Schema schema =
-          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
-      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
-          Optional.of(schema)));
+      Schema schema = KafkaReporterUtils.getMetricReportSchema();
+      SchemaRegistryVersionWriter schemaVersionWriter =
+          builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+              builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+      this.serializer.setSchemaVersionWriter(schemaVersionWriter);
     }
   }
 
@@ -79,14 +81,19 @@ public class KafkaAvroReporter extends KafkaReporter {
    * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
    */
   public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
-
     private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+    private Optional<String> schemaId = Optional.absent();
 
     public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
       this.registry = Optional.of(registry);
       return self();
     }
 
+    public T withSchemaId(String schemaId) {
+      this.schemaId = Optional.of(schemaId);
+      return self();
+    }
+
     /**
      * Builds and returns {@link KafkaAvroReporter}.
      *
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 556c1d1..0bd3683 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -41,7 +41,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.util.AvroUtils;
 
 
@@ -78,7 +78,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
         String.format("Property %s not provided.", KAFKA_SCHEMA_REGISTRY_URL));
 
     this.url = props.getProperty(KAFKA_SCHEMA_REGISTRY_URL);
-    this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
+    this.namespaceOverride = KafkaReporterUtils.extractOverrideNamespace(props);
 
     int objPoolSize =
         Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 9f38379..5c12360 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -25,45 +25,43 @@ import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
 import org.apache.gobblin.metrics.KafkaReportingFormats;
 import org.apache.gobblin.metrics.RootMetricContext;
-
-import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 
 
 @Slf4j
 public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
   @Override
-  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
-      throws IOException {
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return null;
     }
     log.info("Reporting metrics & events to Kafka");
 
-    Optional<String> defaultTopic =
-        Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
-    Optional<String> metricsTopic =
-        Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
-    Optional<String> eventsTopic =
-        Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+    boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
 
-    boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
-    if (metricsEnabled) {
+    if (KafkaReporterUtils.isMetricsEnabled(properties)) {
       log.info("Metrics enabled ---  Reporting metrics to Kafka");
     }
-    boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
-    if (eventsEnabled) {
+
+    boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
+    if (KafkaReporterUtils.isEventsEnabled(properties)) {
       log.info("Events enabled --- Reporting events to Kafka");
     }
 
+    Optional<String> eventsTopic = KafkaReporterUtils.getEventsTopic(properties);
+    Optional<String> defaultTopic = KafkaReporterUtils.getDefaultTopic(properties);
+
     try {
       Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
           "Kafka metrics brokers missing.");
-      Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
+      Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
     } catch (IllegalArgumentException exception) {
       log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
       return null;
@@ -84,6 +82,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
       formatEnum = KafkaReportingFormats.JSON;
     }
 
+    Optional<String> metricsTopic = KafkaReporterUtils.getMetricsTopic(properties);
     if (metricsEnabled) {
       try {
         formatEnum.buildMetricsReporter(brokers, metricsTopic.or(defaultTopic).get(), properties);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
index 6cd5346..d06076a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
@@ -17,7 +17,10 @@
 
 package org.apache.gobblin.metrics.kafka;
 
-public class SchemaRegistryException extends Exception {
+import java.io.IOException;
+
+
+public class SchemaRegistryException extends IOException {
 
   private static final long serialVersionUID = 1L;
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
similarity index 55%
rename from gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
rename to gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
index 1d82921..5d44adb 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
@@ -16,19 +16,25 @@
  */
 package org.apache.gobblin.metrics.reporter.util;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.avro.Schema;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+
 
-public class KafkaAvroReporterUtil {
+public class KafkaReporterUtils {
+  public static final String METRIC_REPORT_AVRO_SCHEMA_FILE = "MetricReport.avsc";
+  public static final String GOBBLIN_TRACKING_EVENT_AVRO_SCHEMA_FILE = "GobblinTrackingEvent.avsc";
 
   private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
   private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
@@ -72,4 +78,49 @@ public class KafkaAvroReporterUtil {
 
     return Optional.<Map<String, String>>absent();
   }
+
+  public static boolean isMetricsEnabled(Properties properties) {
+    Optional<String> defaultTopic = getDefaultTopic(properties);
+    Optional<String> metricsTopic = getMetricsTopic(properties);
+
+    return metricsTopic.or(defaultTopic).isPresent();
+  }
+
+  public static boolean isEventsEnabled(Properties properties) {
+    Optional<String> defaultTopic = getDefaultTopic(properties);
+    Optional<String> eventsTopic = getEventsTopic(properties);
+
+    return eventsTopic.or(defaultTopic).isPresent();
+  }
+
+  public static Optional<String> getDefaultTopic(Properties properties) {
+    return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+  }
+
+  public static Optional<String> getMetricsTopic(Properties properties) {
+    return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+  }
+
+  public static Optional<String> getEventsTopic(Properties properties) {
+    return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+  }
+
+  public static boolean isKafkaReportingEnabled(Properties properties) {
+    return Boolean.parseBoolean(
+        properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED));
+  }
+
+  public static boolean isKafkaAvroSchemaRegistryEnabled(Properties properties) {
+    return Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY));
+  }
+
+  public static Schema getMetricReportSchema() throws IOException {
+    return new Schema.Parser()
+        .parse(KafkaReporterUtils.class.getClassLoader().getResourceAsStream(METRIC_REPORT_AVRO_SCHEMA_FILE));
+  }
+
+  public static Schema getGobblinTrackingEventSchema() throws IOException {
+    return new Schema.Parser()
+        .parse(KafkaReporterUtils.class.getClassLoader().getResourceAsStream(GOBBLIN_TRACKING_EVENT_AVRO_SCHEMA_FILE));
+  }
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
index bffd612..9c52f10 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
@@ -26,11 +26,14 @@ import org.apache.avro.Schema;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 
-import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.util.ConfigUtils;
@@ -41,50 +44,46 @@ import org.apache.gobblin.util.ConfigUtils;
  * {@link org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry} to get Schema version identifier and write it to
  * {@link java.io.DataOutputStream}.
  */
+@Slf4j
 public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema> {
 
   private final KafkaAvroSchemaRegistry registry;
   private Map<Schema, String> registrySchemaIds;
-  private final Optional<String> overrideName;
-  private final Optional<Schema> schema;
-  private final Optional<String> schemaId;
+  private final String overrideName;
+  private final Schema schema;
+  private final String schemaId;
   private final int schemaIdLengthBytes;
 
   public SchemaRegistryVersionWriter(Config config)
-      throws IOException {
-    this(new KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config)), Optional.<String>absent(),
-        Optional.<Schema>absent());
+      throws SchemaRegistryException {
+    this(new KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config)), null, null, null);
   }
 
-  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, String overrideName)
-      throws IOException {
-    this(registry, overrideName, Optional.<Schema>absent());
+  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName) throws SchemaRegistryException {
+    this(registry, overrideName, null);
   }
 
-  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, String overrideName,
-      Optional<Schema> singleSchema)
-      throws IOException {
-    this(registry, Optional.of(overrideName), singleSchema);
+  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName, @Nullable Schema singleSchema)
+      throws SchemaRegistryException {
+    this(registry, overrideName, singleSchema, null);
   }
 
-  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, Optional<String> overrideName,
-      Optional<Schema> singleSchema)
-      throws IOException {
+  public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName, @Nullable Schema singleSchema, @Nullable String schemaId)
+      throws SchemaRegistryException {
     this.registry = registry;
     this.registrySchemaIds = Maps.newConcurrentMap();
     this.overrideName = overrideName;
     this.schema = singleSchema;
     this.schemaIdLengthBytes = registry.getSchemaIdLengthByte();
-    if (this.schema.isPresent()) {
-      try {
-        this.schemaId = this.overrideName.isPresent() ? Optional
-            .of(this.registry.register(this.schema.get(), this.overrideName.get()))
-            : Optional.of(this.registry.register(this.schema.get()));
-      } catch (SchemaRegistryException e) {
-        throw Throwables.propagate(e);
-      }
+    if ((this.schema != null) && (schemaId == null)) {
+      this.schemaId =
+          (!Strings.isNullOrEmpty(this.overrideName)) ? this.registry.register(this.schema, this.overrideName)
+              : this.registry.register(this.schema);
     } else {
-      this.schemaId = Optional.absent();
+      if (schemaId != null) {
+        log.info("Skipping registering schema with schema registry. Using schema with id: {}", schemaId);
+      }
+      this.schemaId = schemaId;
     }
   }
 
@@ -92,7 +91,7 @@ public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema>
   public void writeSchemaVersioningInformation(Schema schema, DataOutputStream outputStream)
       throws IOException {
 
-    String schemaId = this.schemaId.isPresent() ? this.schemaId.get() : this.getIdForSchema(schema);
+    String schemaId = this.schemaId != null ? this.schemaId : this.getIdForSchema(schema);
 
     outputStream.writeByte(KafkaAvroSchemaRegistry.MAGIC_BYTE);
     try {
@@ -105,7 +104,7 @@ public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema>
   private String getIdForSchema(Schema schema) {
     if (!this.registrySchemaIds.containsKey(schema)) {
       try {
-        String schemaId = this.overrideName.isPresent() ? this.registry.register(schema, this.overrideName.get())
+        String schemaId = !Strings.isNullOrEmpty(this.overrideName) ? this.registry.register(schema, this.overrideName)
             : this.registry.register(schema);
         this.registrySchemaIds.put(schema, schemaId);
       } catch (SchemaRegistryException e) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
index d4e5939..995c41b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
@@ -17,29 +17,41 @@
 
 package org.apache.gobblin.metrics.reporter;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
 import org.apache.gobblin.metrics.kafka.Pusher;
 import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
 
-import com.google.common.collect.Lists;
+public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest {
+  private static final int SCHEMA_ID_LENGTH_BYTES = 20;
 
+  private String schemaId;
 
-public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest {
+  @BeforeClass
+  public void setUp() throws IOException {
+    Schema schema =
+        new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+    this.schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+  }
 
   @Override
   public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
@@ -48,19 +60,15 @@ public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTe
     return builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", "k2", "k3"));
   }
 
-  private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it) throws IOException {
+  private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it, boolean isSchemaIdEnabled) throws IOException {
     Assert.assertTrue(it.hasNext());
     Pair<String, byte[]> event = it.next();
-    return Pair.of(event.getKey(), EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), event.getValue()));
+    return isSchemaIdEnabled ? Pair.of(event.getKey(), EventUtils
+        .deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), event.getValue(), schemaId)) : Pair.of(event.getKey(),
+        EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), event.getValue()));
   }
 
-  @Test
-  public void testKafkaEventReporter() throws IOException {
-    MetricContext context = MetricContext.builder("context").build();
-
-    MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
-    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
+  private GobblinTrackingEvent getEvent(boolean isMessageKeyed) {
     String namespace = "gobblin.metrics.test";
     String eventName = "testEvent";
 
@@ -70,27 +78,58 @@ public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTe
     Map<String, String> metadata = Maps.newHashMap();
     metadata.put("m1", "v1");
     metadata.put("m2", null);
+    if (isMessageKeyed) {
+      metadata.put("k1", "v1");
+      metadata.put("k2", "v2");
+      metadata.put("k3", "v3");
+    }
+
     event.setMetadata(metadata);
-    context.submitEvent(event);
+    return event;
+  }
+
+  @Test
+  public void testKafkaEventReporter() throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+
+    MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+    context.submitEvent(getEvent(false));
 
     kafkaReporter.report();
 
-    Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator());
+    Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator(), false);
     Assert.assertNull(retrievedEvent.getKey());
 
-    event = new GobblinTrackingEvent();
-    event.setName(eventName);
-    event.setNamespace(namespace);
-    metadata = Maps.newHashMap();
-    metadata.put("k1", "v1");
-    metadata.put("k2", "v2");
-    metadata.put("k3", "v3");
-    event.setMetadata(metadata);
-    context.submitEvent(event);
+    context.submitEvent(getEvent(true));
+
+    kafkaReporter.report();
+
+    retrievedEvent = nextKVEvent(pusher.messageIterator(), false);
+    Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3");
+  }
+
+  @Test
+  public void testKafkaEventReporterWithSchemaRegistry() throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+    MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
+
+    Schema schema =
+        new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+    String schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+
+    KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+    KafkaAvroEventKeyValueReporter kafkaReporter =
+        builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", "k2", "k3"))
+            .withSchemaRegistry(Mockito.mock(KafkaAvroSchemaRegistry.class)).withSchemaId(schemaId)
+            .build("localhost:0000", "topic");
+
+    context.submitEvent(getEvent(true));
 
     kafkaReporter.report();
 
-    retrievedEvent = nextKVEvent(pusher.messageIterator());
+    Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator(), true);
     Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3");
   }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
index e240a53..bbb9034 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
@@ -45,6 +45,6 @@ public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
   protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
       throws IOException {
     Assert.assertTrue(it.hasNext());
-    return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
+    return EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), it.next());
   }
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java
new file mode 100644
index 0000000..4524a63
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+
+/**
+ * Test for KafkaAvroReporter that is configured with a {@link KafkaAvroSchemaRegistry}.
+ * Extends KafkaAvroReporterTest and just redefines the builder and the metrics deserializer
+ *
+ * @author Sudarshan Vasudevan
+ */
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroReporterWithSchemaRegistryTest extends KafkaAvroReporterTest {
+  private static final int SCHEMA_ID_LENGTH_BYTES = 20;
+
+  private String schemaId;
+  private KafkaAvroSchemaRegistry schemaRegistry;
+
+  public KafkaAvroReporterWithSchemaRegistryTest(String topic)
+      throws IOException, InterruptedException {
+    super();
+    this.schemaId = getSchemaId();
+    this.schemaRegistry = getMockSchemaRegistry();
+  }
+
+  public KafkaAvroReporterWithSchemaRegistryTest()
+      throws IOException, InterruptedException {
+    this("KafkaAvroReporterTestWithSchemaRegistry");
+    this.schemaId = getSchemaId();
+  }
+
+  private KafkaAvroSchemaRegistry getMockSchemaRegistry() throws IOException {
+    KafkaAvroSchemaRegistry registry  = Mockito.mock(KafkaAvroSchemaRegistry.class);
+    Mockito.when(registry.getSchemaIdLengthByte()).thenAnswer(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) {
+        return KafkaAvroReporterWithSchemaRegistryTest.SCHEMA_ID_LENGTH_BYTES;
+      }
+    });
+    Mockito.when(registry.getSchemaByKey(Mockito.anyString())).thenAnswer(new Answer<Schema>() {
+      @Override
+      public Schema answer(InvocationOnMock invocation) {
+        return MetricReport.SCHEMA$;
+      }
+    });
+    return registry;
+  }
+
+  private String getSchemaId() throws IOException {
+    Schema schema =
+        new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
+    return DigestUtils.sha1Hex(schema.toString().getBytes());
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher).withSchemaRegistry(this.schemaRegistry).withSchemaId(schemaId);
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher).withSchemaRegistry(this.schemaRegistry).withSchemaId(schemaId);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected MetricReport nextReport(Iterator<byte[]> it)
+      throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next(), this.schemaId);
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
index d2bf63f..9c23972 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -32,7 +32,7 @@ import com.google.common.collect.Maps;
 import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -65,7 +65,7 @@ public class KeyValueEventObjectReporterTest extends KeyValueEventObjectReporter
    */
   public static KeyValueEventObjectReporterTest.Builder getBuilder(MetricContext context, Properties props) {
     KeyValueEventObjectReporterTest.Builder builder = new KeyValueEventObjectReporterTest.Builder(context);
-    builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props))
+    builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(props))
         .withConfig(ConfigUtils.propertiesToConfig(props));
     return builder;
   }
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
index f674b0e..64dfbee 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -30,7 +30,7 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -62,7 +62,7 @@ public class KeyValueMetricObjectReporterTest extends KeyValueMetricObjectReport
    */
   public static KeyValueMetricObjectReporterTest.Builder getBuilder(Properties props) {
     KeyValueMetricObjectReporterTest.Builder builder = new KeyValueMetricObjectReporterTest.Builder();
-    builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props));
+    builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(props));
     return builder;
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 31232c4..b670a7a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -30,7 +30,6 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 
 import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -69,7 +68,7 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
     if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
           create(ConfigUtils.configToProperties(config));
-      this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, Optional.of(GobblinTrackingEvent.SCHEMA$));
+      this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, GobblinTrackingEvent.SCHEMA$);
     } else {
       this.schemaVersionWriter = new FixedSchemaVersionWriter();
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 82842f2..5fa64e1 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.commons.mail.EmailException;
 import org.apache.hadoop.conf.Configuration;
@@ -75,6 +77,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -91,6 +94,8 @@ import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterManager;
@@ -99,6 +104,9 @@ import org.apache.gobblin.cluster.GobblinHelixConstants;
 import org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.rest.JobExecutionInfoServer;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.util.ClassAliasResolver;
@@ -149,6 +157,7 @@ import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_
  * @author Yinan Li
  */
 public class GobblinYarnAppLauncher {
+  public static final String GOBBLIN_YARN_CONFIG_OUTPUT_PATH = "gobblin.yarn.configOutputPath";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinYarnAppLauncher.class);
 
@@ -287,6 +296,12 @@ public class GobblinYarnAppLauncher {
 
     this.messagingService = new GobblinHelixMessagingService(this.helixManager);
 
+    try {
+      config = addDynamicConfig(config);
+      outputConfigToFile(config);
+    } catch (SchemaRegistryException e) {
+      throw new IOException(e);
+    }
   }
 
   /**
@@ -919,6 +934,79 @@ public class GobblinYarnAppLauncher {
     }
   }
 
+  private static Config addDynamicConfig(Config config) throws IOException {
+    Properties properties = ConfigUtils.configToProperties(config);
+    if (KafkaReporterUtils.isKafkaReportingEnabled(properties) && KafkaReporterUtils.isKafkaAvroSchemaRegistryEnabled(properties)) {
+      KafkaAvroSchemaRegistry registry = new KafkaAvroSchemaRegistry(properties);
+      return addMetricReportingDynamicConfig(config, registry);
+    } else {
+      return config;
+    }
+  }
+
+  /**
+   * Write the config to the file specified with the config key {@value GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it
+   * is configured.
+   * @param config the config to output
+   * @throws IOException
+   */
+  @VisibleForTesting
+  static void outputConfigToFile(Config config)
+      throws IOException {
+    // If a file path is specified then write the Azkaban config to that path in HOCON format.
+    // This can be used to generate an application.conf file to pass to the yarn app master and containers.
+    if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) {
+      File configFile = new File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH));
+      File parentDir = configFile.getParentFile();
+
+      if (parentDir != null && !parentDir.exists()) {
+        if (!parentDir.mkdirs()) {
+          throw new IOException("Error creating directories for " + parentDir);
+        }
+      }
+
+      ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
+      configRenderOptions = configRenderOptions.setComments(false);
+      configRenderOptions = configRenderOptions.setOriginComments(false);
+      configRenderOptions = configRenderOptions.setFormatted(true);
+      configRenderOptions = configRenderOptions.setJson(false);
+
+      String renderedConfig = config.root().render(configRenderOptions);
+
+      FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
+    }
+  }
+
+  /**
+   * A method that adds dynamic config related to Kafka-based metric reporting. In particular, if Kafka based metric
+   * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured, this method registers the metric reporting
+   * related schemas and adds the returned schema ids to the config to be used by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster}
+   * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The advantage of doing this is that the TaskRunners
+   * do not have to initiate a connection with the schema registry server and reduces the chances of metric reporter
+   * instantiation failures in the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s.
+   * @param config
+   */
+  @VisibleForTesting
+  static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegistry registry) throws IOException {
+    Properties properties = ConfigUtils.configToProperties(config);
+    if (KafkaReporterUtils.isEventsEnabled(properties)) {
+      Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+      String schemaId = registry.register(schema, KafkaReporterUtils.getEventsTopic(properties).get());
+      LOGGER.info("Adding schemaId {} for GobblinTrackingEvent to the config", schemaId);
+      config = config.withValue(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID,
+          ConfigValueFactory.fromAnyRef(schemaId));
+    }
+
+    if (KafkaReporterUtils.isMetricsEnabled(properties)) {
+      Schema schema = KafkaReporterUtils.getMetricReportSchema();
+      String schemaId = registry.register(schema, KafkaReporterUtils.getMetricsTopic(properties).get());
+      LOGGER.info("Adding schemaId {} for MetricReport to the config", schemaId);
+      config = config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+          ConfigValueFactory.fromAnyRef(schemaId));
+    }
+    return config;
+  }
+
   public static void main(String[] args) throws Exception {
     final GobblinYarnAppLauncher gobblinYarnAppLauncher =
         new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration());
@@ -942,4 +1030,4 @@ public class GobblinYarnAppLauncher {
 
     gobblinYarnAppLauncher.launch();
   }
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index c1c7142..46042c4 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -25,12 +25,15 @@ import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +49,8 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.model.Message;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -54,6 +59,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
@@ -72,6 +78,8 @@ import org.apache.gobblin.cluster.TestHelper;
 import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
@@ -426,6 +434,45 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
     Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();
   }
 
+  @Test
+  public void testOutputConfig() throws IOException {
+    File tmpTestDir = com.google.common.io.Files.createTempDir();
+
+    try {
+      Path outputPath = Paths.get(tmpTestDir.toString(), "application.conf");
+      Config config = ConfigFactory.empty()
+          .withValue(ConfigurationKeys.FS_URI_KEY, ConfigValueFactory.fromAnyRef("file:///"))
+          .withValue(GobblinYarnAppLauncher.GOBBLIN_YARN_CONFIG_OUTPUT_PATH,
+              ConfigValueFactory.fromAnyRef(outputPath.toString()));
+
+      GobblinYarnAppLauncher.outputConfigToFile(config);
+
+      String configString = com.google.common.io.Files.toString(outputPath.toFile(), Charsets.UTF_8);
+      Assert.assertTrue(configString.contains("fs"));
+    } finally {
+      FileUtils.deleteDirectory(tmpTestDir);
+    }
+  }
+
+  @Test
+  public void testAddMetricReportingDynamicConfig()
+      throws IOException {
+    KafkaAvroSchemaRegistry schemaRegistry = Mockito.mock(KafkaAvroSchemaRegistry.class);
+    Mockito.when(schemaRegistry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) {
+        return "testId";
+      }
+    });
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS, ConfigValueFactory.fromAnyRef("topic"))
+        .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, ConfigValueFactory.fromAnyRef(true))
+        .withValue(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, ConfigValueFactory.fromAnyRef("http://testSchemaReg:0000"));
+    config = GobblinYarnAppLauncher.addMetricReportingDynamicConfig(config, schemaRegistry);
+    Assert.assertEquals(config.getString(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID), "testId");
+    Assert.assertFalse(config.hasPath(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID));
+  }
+
   /**
    * An application master for accessing protected fields in {@link GobblinApplicationMaster}
    * for testing.
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 160ad39..63e2cd5 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -50,6 +50,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -64,6 +69,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 
@@ -284,7 +290,25 @@ public class YarnServiceTest {
    static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
-      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config));
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+      HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+      Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean());
+      Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+      return helixManager;
     }
 
     protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)