You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/27 19:00:50 UTC

[1/2] incubator-gobblin git commit: [GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 90d8495ae -> ee770f5c5


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
new file mode 100644
index 0000000..1c935b4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Kafka reporter for metrics.
+ *
+ * @author ibuenros
+ */
+@Slf4j
+public class KafkaReporter extends MetricReportReporter {
+  public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
+  private static final String METRICS_KAFKA_PREFIX = "metrics.kafka";
+
+  protected final AvroSerializer<MetricReport> serializer;
+  protected final Pusher kafkaPusher;
+
+  protected KafkaReporter(Builder<?> builder, Config config) throws IOException {
+    super(builder, config);
+
+    SchemaVersionWriter versionWriter;
+    if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
+      try {
+        ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class);
+        Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE));
+        versionWriter = klazz.newInstance();
+      } catch (ReflectiveOperationException roe) {
+        throw new IOException("Could not instantiate version writer.", roe);
+      }
+    } else {
+      versionWriter = new FixedSchemaVersionWriter();
+    }
+
+    log.info("Schema version writer: " + versionWriter.getClass().getName());
+    this.serializer = this.closer.register(createSerializer(versionWriter));
+
+    if (builder.kafkaPusher.isPresent()) {
+      this.kafkaPusher = builder.kafkaPusher.get();
+    } else {
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+      String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+          PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+
+      this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, Optional.of(kafkaConfig));
+    }
+  }
+
+  protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
+    return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
+  }
+
+  /**
+   * A static factory class for obtaining new {@link Builder}s
+   *
+   * @see Builder
+   */
+  public static class BuilderFactory {
+
+    public static BuilderImpl newBuilder() {
+      return new BuilderImpl();
+    }
+  }
+
+  public static class BuilderImpl extends Builder<BuilderImpl> {
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
+   */
+  public static abstract class Builder<T extends MetricReportReporter.Builder<T>>
+      extends MetricReportReporter.Builder<T> {
+
+    protected String brokers;
+    protected String topic;
+    protected Optional<Pusher> kafkaPusher;
+
+    protected Builder() {
+      super();
+      this.name = "KafkaReporter";
+      this.kafkaPusher = Optional.absent();
+    }
+
+    /**
+     * Set {@link Pusher} to use.
+     */
+    public T withKafkaPusher(Pusher pusher) {
+      this.kafkaPusher = Optional.of(pusher);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaReporter
+     */
+    public KafkaReporter build(String brokers, String topic, Properties props) throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+
+      return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+    }
+  }
+
+  @Override
+  protected void emitReport(MetricReport report) {
+    this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
new file mode 100644
index 0000000..9faac33
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
+import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+  @Override
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
+      throws IOException {
+    if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
+      return null;
+    }
+    log.info("Reporting metrics to Kafka");
+
+    Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+    Optional<String> metricsTopic = Optional.fromNullable(
+        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+    Optional<String> eventsTopic = Optional.fromNullable(
+        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+
+    boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
+    if (metricsEnabled) log.info("Reporting metrics to Kafka");
+    boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
+    if (eventsEnabled) log.info("Reporting events to Kafka");
+
+    try {
+      Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
+          "Kafka metrics brokers missing.");
+      Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
+    } catch (IllegalArgumentException exception) {
+      log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
+      return null;
+    }
+
+    String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
+
+    String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
+
+    KafkaReportingFormats formatEnum;
+    try {
+      formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
+    } catch (IllegalArgumentException exception) {
+      log.warn("Kafka metrics reporting format " + reportingFormat +
+          " not recognized. Will report in json format.", exception);
+      formatEnum = KafkaReportingFormats.JSON;
+    }
+
+    if (metricsEnabled) {
+      try {
+        formatEnum.metricReporterBuilder(properties)
+            .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
+      } catch (IOException exception) {
+        log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
+      }
+    }
+
+    if (eventsEnabled) {
+      try {
+        KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
+            properties);
+
+        Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties),
+            PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+        builder.withConfig(kafkaConfig);
+
+        builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+            PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME));
+
+        return builder.build(brokers, eventsTopic.or(defaultTopic).get());
+      } catch (IOException exception) {
+        log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
+      }
+    }
+
+    log.info("Will start reporting metrics to Kafka");
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
new file mode 100644
index 0000000..5abd503
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.Closeable;
+import java.util.List;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public interface Pusher extends Closeable {
+  /**
+   * Push all byte array messages to the Kafka topic.
+   * @param messages List of byte array messages to push to Kakfa.
+   */
+  void pushMessages(List<byte[]> messages);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
new file mode 100644
index 0000000..a76c750
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+public class PusherUtils {
+  public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = "metrics.reporting.kafka.config";
+  public static final String KAFKA_PUSHER_CLASS_NAME_KEY = "metrics.reporting.kafkaPusherClass";
+  public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = "org.apache.gobblin.metrics.kafka.KafkaPusher";
+
+  /**
+   * Create a {@link Pusher}
+   * @param pusherClassName the {@link Pusher} class to instantiate
+   * @param brokers brokers to connect to
+   * @param topic the topic to write to
+   * @param config additional configuration for configuring the {@link Pusher}
+   * @return a {@link Pusher}
+   */
+  public static Pusher getPusher(String pusherClassName, String brokers, String topic, Optional<Config> config) {
+    try {
+      Class<?> pusherClass = Class.forName(pusherClassName);
+
+     return (Pusher) GobblinConstructorUtils.invokeLongestConstructor(pusherClass,
+          brokers, topic, config);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Could not instantiate kafka pusher", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
new file mode 100644
index 0000000..e240a53
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
+
+  @Override
+  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
+      Pusher pusher) {
+    return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
+      throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
new file mode 100644
index 0000000..e7be31d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+/**
+ * Test for KafkaAvroReporter
+ * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer
+ *
+ * @author ibuenros
+ */
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroReporterTest extends KafkaReporterTest {
+
+  public KafkaAvroReporterTest(String topic)
+      throws IOException, InterruptedException {
+    super();
+  }
+
+  public KafkaAvroReporterTest() throws IOException, InterruptedException {
+    this("KafkaAvroReporterTest");
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected MetricReport nextReport(Iterator<byte[]> it)
+      throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
new file mode 100644
index 0000000..add42f4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaEventReporterTest {
+
+  /**
+   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+   * @param context metricregistry
+   * @return KafkaReporter builder
+   */
+  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
+      Pusher pusher) {
+    return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher);
+  }
+
+
+  @Test
+  public void testKafkaEventReporter() throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+    String namespace = "gobblin.metrics.test";
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("m1", "v1");
+    metadata.put("m2", null);
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+    Assert.assertEquals(retrievedEvent.getName(), eventName);
+    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+
+  }
+
+  @Test
+  public void testTagInjection() throws IOException {
+
+    String tag1 = "tag1";
+    String value1 = "value1";
+    String metadataValue1 = "metadata1";
+    String tag2 = "tag2";
+    String value2 = "value2";
+
+    MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)).
+        addTag(new Tag<String>(tag2, value2)).build();
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+    String namespace = "gobblin.metrics.test";
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(tag1, metadataValue1);
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+    Assert.assertEquals(retrievedEvent.getName(), eventName);
+    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+    Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1);
+    Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
+  }
+
+  /**
+   * Extract the next metric from the Kafka iterator
+   * Assumes existence of the metric has already been checked.
+   * @param it Kafka ConsumerIterator
+   * @return next metric in the stream
+   * @throws IOException
+   */
+  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
new file mode 100644
index 0000000..f653dd9
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.metrics.Measurements;
+import org.apache.gobblin.metrics.Metric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+@Test(groups = { "gobblin.metrics" })
+public class KafkaReporterTest {
+
+  public KafkaReporterTest() throws IOException, InterruptedException {}
+
+  /**
+   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+   * @return KafkaReporter builder
+   */
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Test
+  public void testKafkaReporter() throws IOException {
+    MetricContext metricContext =
+        MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build();
+    Counter counter = metricContext.counter("com.linkedin.example.counter");
+    Meter meter = metricContext.meter("com.linkedin.example.meter");
+    Histogram histogram = metricContext.histogram("com.linkedin.example.histogram");
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties());
+
+    counter.inc();
+    meter.mark(2);
+    histogram.update(1);
+    histogram.update(1);
+    histogram.update(2);
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Map<String, Double> expected = new HashMap<>();
+    expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0);
+    expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0);
+    expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0);
+
+    MetricReport nextReport = nextReport(pusher.messageIterator());
+
+    expectMetricsWithValues(nextReport, expected);
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Set<String> expectedSet = new HashSet<>();
+    expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
+
+    nextReport = nextReport(pusher.messageIterator());
+    expectMetrics(nextReport, expectedSet, true);
+
+    kafkaReporter.close();
+
+  }
+
+  @Test
+  public void kafkaReporterTagsTest() throws IOException {
+    MetricContext metricContext =
+        MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
+    Counter counter = metricContext.counter("com.linkedin.example.counter");
+
+    Tag<?> tag1 = new Tag<>("tag1", "value1");
+    Tag<?> tag2 = new Tag<>("tag2", 2);
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter =
+        getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties());
+
+    counter.inc();
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MetricReport metricReport = nextReport(pusher.messageIterator());
+
+    Assert.assertEquals(4, metricReport.getTags().size());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
+  }
+
+  @Test
+  public void kafkaReporterContextTest() throws IOException {
+    Tag<?> tag1 = new Tag<>("tag1", "value1");
+    MetricContext context = MetricContext.builder("context").addTag(tag1).build();
+    Counter counter = context.counter("com.linkedin.example.counter");
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties());
+
+    counter.inc();
+
+    kafkaReporter.report(context);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MetricReport metricReport = nextReport(pusher.messageIterator());
+
+    Assert.assertEquals(3, metricReport.getTags().size());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+
+  }
+
+  /**
+   * Expect a list of metrics with specific values.
+   * Fail if not all metrics are received, or some metric has the wrong value.
+   * @param report MetricReport.
+   * @param expected map of expected metric names and their values
+   * @throws IOException
+   */
+  private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException {
+    List<Metric> metricIterator = report.getMetrics();
+
+    for (Metric metric : metricIterator) {
+      if (expected.containsKey(metric.getName())) {
+        Assert.assertEquals(expected.get(metric.getName()), metric.getValue());
+        expected.remove(metric.getName());
+      }
+    }
+
+    Assert.assertTrue(expected.isEmpty());
+
+  }
+
+  /**
+   * Expect a set of metric names. Will fail if not all of these metrics are received.
+   * @param report MetricReport
+   * @param expected set of expected metric names
+   * @param strict if set to true, will fail if receiving any metric that is not expected
+   * @throws IOException
+   */
+  private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException {
+    List<Metric> metricIterator = report.getMetrics();
+    for (Metric metric : metricIterator) {
+      //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name));
+      if (expected.contains(metric.getName())) {
+        expected.remove(metric.getName());
+      } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) {
+        Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
+      }
+    }
+    Assert.assertTrue(expected.isEmpty());
+  }
+
+  /**
+   * Extract the next metric from the Kafka iterator
+   * Assumes existence of the metric has already been checked.
+   * @param it Kafka ConsumerIterator
+   * @return next metric in the stream
+   * @throws IOException
+   */
+  protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
new file mode 100644
index 0000000..71decbb
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.collect.Queues;
+
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing.
+ */
+public class MockKafkaPusher implements Pusher {
+
+  Queue<byte[]> messages = Queues.newLinkedBlockingQueue();
+
+  public MockKafkaPusher() {
+  }
+
+  @Override
+  public void pushMessages(List<byte[]> messages) {
+    this.messages.addAll(messages);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public Iterator<byte[]> messageIterator() {
+    return this.messages.iterator();
+  }
+
+}


[2/2] incubator-gobblin git commit: [GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer

Posted by hu...@apache.org.
[GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer

Closes #2153 from htran1/metrics09


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ee770f5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ee770f5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ee770f5c

Branch: refs/heads/master
Commit: ee770f5c5aeec469d7d93c016ce0a25200932eb2
Parents: 90d8495
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Oct 27 12:00:41 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Oct 27 12:00:41 2017 -0700

----------------------------------------------------------------------
 .../gobblin/metrics/KafkaReportingFormats.java  |  83 -------
 .../metrics/kafka/KafkaAvroEventReporter.java   | 122 ----------
 .../metrics/kafka/KafkaAvroReporter.java        | 105 --------
 .../metrics/kafka/KafkaEventReporter.java       | 151 ------------
 .../metrics/kafka/KafkaProducerPusher.java      |  91 +++++++
 .../gobblin/metrics/kafka/KafkaPusher.java      |   3 +-
 .../gobblin/metrics/kafka/KafkaReporter.java    | 147 -----------
 .../metrics/kafka/KafkaReporterFactory.java     | 104 --------
 .../reporter/KafkaAvroEventReporterTest.java    |  50 ----
 .../metrics/reporter/KafkaAvroReporterTest.java |  68 ------
 .../reporter/KafkaEventReporterTest.java        | 150 ------------
 .../metrics/reporter/KafkaReporterTest.java     | 242 -------------------
 .../metrics/kafka/KafkaProducerPusher.java      |  91 +++++++
 .../reporter/KafkaProducerPusherTest.java       |  91 +++++++
 .../gobblin/metrics/KafkaReportingFormats.java  |  83 +++++++
 .../metrics/kafka/KafkaAvroEventReporter.java   | 122 ++++++++++
 .../metrics/kafka/KafkaAvroReporter.java        | 105 ++++++++
 .../metrics/kafka/KafkaAvroSchemaRegistry.java  |   2 +-
 .../metrics/kafka/KafkaEventReporter.java       | 170 +++++++++++++
 .../gobblin/metrics/kafka/KafkaReporter.java    | 150 ++++++++++++
 .../metrics/kafka/KafkaReporterFactory.java     | 113 +++++++++
 .../apache/gobblin/metrics/kafka/Pusher.java    |  33 +++
 .../gobblin/metrics/kafka/PusherUtils.java      |  47 ++++
 .../reporter/KafkaAvroEventReporterTest.java    |  50 ++++
 .../metrics/reporter/KafkaAvroReporterTest.java |  67 +++++
 .../reporter/KafkaEventReporterTest.java        | 150 ++++++++++++
 .../metrics/reporter/KafkaReporterTest.java     | 240 ++++++++++++++++++
 .../metrics/reporter/MockKafkaPusher.java       |  55 +++++
 28 files changed, 1660 insertions(+), 1225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
deleted file mode 100644
index 5f47121..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics;
-
-import java.util.Properties;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
-import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-
-
-/**
- * Kafka reporting formats enumeration.
- */
-public enum KafkaReportingFormats {
-
-  AVRO,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format.
-   *
-   * @param properties {@link java.util.Properties} containing information to build reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
-        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return builder;
-      case JSON:
-        return KafkaReporter.BuilderFactory.newBuilder();
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
-    }
-  }
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format.
-   * @param context {@link org.apache.gobblin.metrics.MetricContext} that should be reported.
-   * @param properties {@link java.util.Properties} containing information to build reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
-   */
-  public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context);
-        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return builder;
-      case JSON:
-        return KafkaEventReporter.Factory.forContext(context);
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
deleted file mode 100644
index 5d35c87..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-
-
-/**
- * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records.
- */
-public class KafkaAvroEventReporter extends KafkaEventReporter {
-
-  protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
-    super(builder);
-    if(builder.registry.isPresent()) {
-      Schema schema =
-          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
-      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
-          Optional.of(schema)));
-    }
-  }
-
-  @Override
-  protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter)
-      throws IOException {
-    return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
-  }
-
-  /**
-   * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}.
-   *
-   * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
-   * @return KafkaAvroReporter builder
-   * @deprecated this method is bugged. Use {@link KafkaAvroEventReporter.Factory#forContext} instead.
-   */
-  @Deprecated
-  public static Builder<? extends Builder<?>> forContext(MetricContext context) {
-    return new BuilderImpl(context);
-  }
-
-  private static class BuilderImpl extends Builder<BuilderImpl> {
-    private BuilderImpl(MetricContext context) {
-      super(context);
-    }
-
-    @Override
-    protected BuilderImpl self() {
-      return this;
-    }
-  }
-
-  public static abstract class Factory {
-    /**
-     * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}.
-     *
-     * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
-     * @return KafkaAvroReporter builder
-     */
-    public static KafkaAvroEventReporter.BuilderImpl forContext(MetricContext context) {
-      return new BuilderImpl(context);
-    }
-  }
-
-  /**
-   * Builder for {@link KafkaAvroEventReporter}.
-   * Defaults to no filter, reporting rates in seconds and times in milliseconds.
-   */
-  public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
-
-    private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
-
-    private Builder(MetricContext context) {
-      super(context);
-    }
-
-    public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
-      this.registry = Optional.of(registry);
-      return self();
-    }
-
-    /**
-     * Builds and returns {@link KafkaAvroEventReporter}.
-     *
-     * @param brokers string of Kafka brokers
-     * @param topic topic to send metrics to
-     * @return KafkaAvroReporter
-     */
-    public KafkaAvroEventReporter build(String brokers, String topic) throws IOException {
-      this.brokers = brokers;
-      this.topic = topic;
-      return new KafkaAvroEventReporter(this);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
deleted file mode 100644
index 35d558e..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ConfigUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-
-
-/**
- * Kafka reporter for codahale metrics writing metrics in Avro format.
- *
- * @author ibuenros
- */
-public class KafkaAvroReporter extends KafkaReporter {
-
-  protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
-    super(builder, config);
-    if (builder.registry.isPresent()) {
-      Schema schema =
-          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
-      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
-          Optional.of(schema)));
-    }
-  }
-
-  @Override
-  protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter)
-      throws IOException {
-    return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
-  }
-
-  /**
-   * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder}s
-   *
-   * @see org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder
-   */
-  public static class BuilderFactory {
-
-    public static BuilderImpl newBuilder() {
-      return new BuilderImpl();
-    }
-  }
-
-  public static class BuilderImpl extends Builder<BuilderImpl> {
-
-    @Override
-    protected BuilderImpl self() {
-      return this;
-    }
-  }
-
-  /**
-   * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
-   */
-  public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
-
-    private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
-
-    public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
-      this.registry = Optional.of(registry);
-      return self();
-    }
-
-    /**
-     * Builds and returns {@link KafkaAvroReporter}.
-     *
-     * @param brokers string of Kafka brokers
-     * @param topic topic to send metrics to
-     * @return KafkaAvroReporter
-     */
-    public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
-      this.brokers = brokers;
-      this.topic = topic;
-      return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
deleted file mode 100644
index c23294a..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.EventReporter;
-import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-
-
-/**
- * Reports {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to a Kafka topic serialized as JSON.
- */
-public class KafkaEventReporter extends EventReporter {
-
-  protected final AvroSerializer<GobblinTrackingEvent> serializer;
-  private final KafkaPusher kafkaPusher;
-
-  public KafkaEventReporter(Builder<?> builder) throws IOException {
-    super(builder);
-
-    this.serializer = this.closer.register(
-        createSerializer(new FixedSchemaVersionWriter()));
-
-    if(builder.kafkaPusher.isPresent()) {
-      this.kafkaPusher = builder.kafkaPusher.get();
-    } else {
-      this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
-    }
-
-  }
-
-  @Override
-  public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
-    GobblinTrackingEvent nextEvent;
-    List<byte[]> events = Lists.newArrayList();
-
-    while(null != (nextEvent = queue.poll())) {
-      events.add(this.serializer.serializeRecord(nextEvent));
-    }
-
-    if (!events.isEmpty()) {
-      this.kafkaPusher.pushMessages(events);
-    }
-
-  }
-
-  protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
-    return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
-  }
-
-  /**
-   * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}.
-   * Will automatically add all Context tags to the reporter.
-   *
-   * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
-   * @return KafkaReporter builder
-   * @deprecated this method is bugged. Use {@link KafkaEventReporter.Factory#forContext} instead.
-   */
-  @Deprecated
-  public static Builder<? extends Builder> forContext(MetricContext context) {
-    return new BuilderImpl(context);
-  }
-
-  public static class BuilderImpl extends Builder<BuilderImpl> {
-    private BuilderImpl(MetricContext context) {
-      super(context);
-    }
-
-    @Override
-    protected BuilderImpl self() {
-      return this;
-    }
-  }
-
-  public static class Factory {
-    /**
-     * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}.
-     * Will automatically add all Context tags to the reporter.
-     *
-     * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
-     * @return KafkaReporter builder
-     */
-    public static BuilderImpl forContext(MetricContext context) {
-      return new BuilderImpl(context);
-    }
-  }
-
-  /**
-   * Builder for {@link KafkaEventReporter}.
-   * Defaults to no filter, reporting rates in seconds and times in milliseconds.
-   */
-  public static abstract class Builder<T extends EventReporter.Builder<T>>
-      extends EventReporter.Builder<T> {
-    protected String brokers;
-    protected String topic;
-    protected Optional<KafkaPusher> kafkaPusher;
-
-    protected Builder(MetricContext context) {
-      super(context);
-      this.kafkaPusher = Optional.absent();
-    }
-
-    /**
-     * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use.
-     */
-    public T withKafkaPusher(KafkaPusher pusher) {
-      this.kafkaPusher = Optional.of(pusher);
-      return self();
-    }
-
-    /**
-     * Builds and returns {@link KafkaEventReporter}.
-     *
-     * @param brokers string of Kafka brokers
-     * @param topic topic to send metrics to
-     * @return KafkaReporter
-     */
-    public KafkaEventReporter build(String brokers, String topic) throws IOException {
-      this.brokers = brokers;
-      this.topic = topic;
-      return new KafkaEventReporter(this);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..ff75a92
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public class KafkaProducerPusher implements Pusher {
+
+  private final String topic;
+  private final KafkaProducer<String, byte[]> producer;
+  private final Closer closer;
+
+  public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+    // add the kafka scoped config. if any of the above are specified then they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all byte array messages to the Kafka topic.
+   * @param messages List of byte array messages to push to Kakfa.
+   */
+  public void pushMessages(List<byte[]> messages) {
+    for (byte[] message: messages) {
+      this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<String, byte[]>(props));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
index 29162ac..1c977ff 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.metrics.kafka;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
@@ -34,7 +33,7 @@ import kafka.producer.ProducerConfig;
 /**
  * Establishes a connection to a Kafka cluster and pushed byte messages to a specified topic.
  */
-public class KafkaPusher implements Closeable {
+public class KafkaPusher implements Pusher {
 
   private final String topic;
   private final ProducerCloseable<String, byte[]> producer;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
deleted file mode 100644
index 2aa0e97..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.MetricReportReporter;
-import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * Kafka reporter for metrics.
- *
- * @author ibuenros
- */
-@Slf4j
-public class KafkaReporter extends MetricReportReporter {
-
-  public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
-
-  protected final AvroSerializer<MetricReport> serializer;
-  protected final KafkaPusher kafkaPusher;
-
-
-  protected KafkaReporter(Builder<?> builder, Config config) throws IOException {
-    super(builder, config);
-
-    SchemaVersionWriter versionWriter;
-    if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
-      try {
-        ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class);
-        Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE));
-        versionWriter = klazz.newInstance();
-      } catch (ReflectiveOperationException roe) {
-        throw new IOException("Could not instantiate version writer.", roe);
-      }
-    } else {
-      versionWriter = new FixedSchemaVersionWriter();
-    }
-
-    log.info("Schema version writer: " + versionWriter.getClass().getName());
-    this.serializer = this.closer.register(createSerializer(versionWriter));
-
-    if (builder.kafkaPusher.isPresent()) {
-      this.kafkaPusher = builder.kafkaPusher.get();
-    } else {
-      this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
-    }
-  }
-
-  protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
-    return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
-  }
-
-  /**
-   * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}s
-   *
-   * @see org.apache.gobblin.metrics.kafka.KafkaReporter.Builder
-   */
-  public static class BuilderFactory {
-
-    public static BuilderImpl newBuilder() {
-      return new BuilderImpl();
-    }
-  }
-
-  public static class BuilderImpl extends Builder<BuilderImpl> {
-
-    @Override
-    protected BuilderImpl self() {
-      return this;
-    }
-  }
-
-  /**
-   * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
-   */
-  public static abstract class Builder<T extends MetricReportReporter.Builder<T>>
-      extends MetricReportReporter.Builder<T> {
-
-    protected String brokers;
-    protected String topic;
-    protected Optional<KafkaPusher> kafkaPusher;
-
-    protected Builder() {
-      super();
-      this.name = "KafkaReporter";
-      this.kafkaPusher = Optional.absent();
-    }
-
-    /**
-     * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use.
-     */
-    public T withKafkaPusher(KafkaPusher pusher) {
-      this.kafkaPusher = Optional.of(pusher);
-      return self();
-    }
-
-    /**
-     * Builds and returns {@link KafkaReporter}.
-     *
-     * @param brokers string of Kafka brokers
-     * @param topic topic to send metrics to
-     * @return KafkaReporter
-     */
-    public KafkaReporter build(String brokers, String topic, Properties props) throws IOException {
-      this.brokers = brokers;
-      this.topic = topic;
-
-      return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
-    }
-  }
-
-  @Override
-  protected void emitReport(MetricReport report) {
-    this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
deleted file mode 100644
index 328a47b..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
-import org.apache.gobblin.metrics.KafkaReportingFormats;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
-
-
-@Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
-  @Override
-  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
-      throws IOException {
-    if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
-        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
-      return null;
-    }
-    log.info("Reporting metrics to Kafka");
-
-    Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
-    Optional<String> metricsTopic = Optional.fromNullable(
-        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
-    Optional<String> eventsTopic = Optional.fromNullable(
-        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
-
-    boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
-    if (metricsEnabled) log.info("Reporting metrics to Kafka");
-    boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
-    if (eventsEnabled) log.info("Reporting events to Kafka");
-
-    try {
-      Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
-          "Kafka metrics brokers missing.");
-      Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
-    } catch (IllegalArgumentException exception) {
-      log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
-      return null;
-    }
-
-    String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
-
-    String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
-        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
-
-    KafkaReportingFormats formatEnum;
-    try {
-      formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
-    } catch (IllegalArgumentException exception) {
-      log.warn("Kafka metrics reporting format " + reportingFormat +
-          " not recognized. Will report in json format.", exception);
-      formatEnum = KafkaReportingFormats.JSON;
-    }
-
-    if (metricsEnabled) {
-      try {
-        formatEnum.metricReporterBuilder(properties)
-            .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
-      } catch (IOException exception) {
-        log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
-      }
-    }
-
-    if (eventsEnabled) {
-      try {
-        KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
-            properties);
-        return builder.build(brokers, eventsTopic.or(defaultTopic).get());
-      } catch (IOException exception) {
-        log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
-      }
-    }
-
-    log.info("Will start reporting metrics to Kafka");
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
deleted file mode 100644
index 066dce4..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-
-
-@Test(groups = {"gobblin.metrics"})
-public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
-
-  @Override
-  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
-      KafkaPusher pusher) {
-    return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
-      throws IOException {
-    Assert.assertTrue(it.hasNext());
-    return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
deleted file mode 100644
index bbf2646..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
-import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-
-
-/**
- * Test for KafkaAvroReporter
- * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer
- *
- * @author ibuenros
- */
-@Test(groups = {"gobblin.metrics"})
-public class KafkaAvroReporterTest extends KafkaReporterTest {
-
-  public KafkaAvroReporterTest(String topic)
-      throws IOException, InterruptedException {
-    super();
-  }
-
-  public KafkaAvroReporterTest() throws IOException, InterruptedException {
-    this("KafkaAvroReporterTest");
-  }
-
-  @Override
-  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) {
-    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
-  }
-
-  @Override
-  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) {
-    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  protected MetricReport nextReport(Iterator<byte[]> it)
-      throws IOException {
-    Assert.assertTrue(it.hasNext());
-    return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
deleted file mode 100644
index 177f8d3..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.Maps;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-
-
-@Test(groups = {"gobblin.metrics"})
-public class KafkaEventReporterTest {
-
-  /**
-   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
-   * @param context metricregistry
-   * @return KafkaReporter builder
-   */
-  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
-      KafkaPusher pusher) {
-    return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher);
-  }
-
-
-  @Test
-  public void testKafkaEventReporter() throws IOException {
-    MetricContext context = MetricContext.builder("context").build();
-
-    MockKafkaPusher pusher = new MockKafkaPusher();
-    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
-    String namespace = "gobblin.metrics.test";
-    String eventName = "testEvent";
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent();
-    event.setName(eventName);
-    event.setNamespace(namespace);
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put("m1", "v1");
-    metadata.put("m2", null);
-    event.setMetadata(metadata);
-    context.submitEvent(event);
-
-    try {
-      Thread.sleep(100);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    kafkaReporter.report();
-
-    try {
-      Thread.sleep(100);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
-    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
-    Assert.assertEquals(retrievedEvent.getName(), eventName);
-    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
-
-  }
-
-  @Test
-  public void testTagInjection() throws IOException {
-
-    String tag1 = "tag1";
-    String value1 = "value1";
-    String metadataValue1 = "metadata1";
-    String tag2 = "tag2";
-    String value2 = "value2";
-
-    MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)).
-        addTag(new Tag<String>(tag2, value2)).build();
-
-    MockKafkaPusher pusher = new MockKafkaPusher();
-    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
-    String namespace = "gobblin.metrics.test";
-    String eventName = "testEvent";
-
-    GobblinTrackingEvent event = new GobblinTrackingEvent();
-    event.setName(eventName);
-    event.setNamespace(namespace);
-    Map<String, String> metadata = Maps.newHashMap();
-    metadata.put(tag1, metadataValue1);
-    event.setMetadata(metadata);
-    context.submitEvent(event);
-
-    try {
-      Thread.sleep(100);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    kafkaReporter.report();
-
-    try {
-      Thread.sleep(100);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
-    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
-    Assert.assertEquals(retrievedEvent.getName(), eventName);
-    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
-    Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1);
-    Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
-  }
-
-  /**
-   * Extract the next metric from the Kafka iterator
-   * Assumes existence of the metric has already been checked.
-   * @param it Kafka ConsumerIterator
-   * @return next metric in the stream
-   * @throws java.io.IOException
-   */
-  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException {
-    Assert.assertTrue(it.hasNext());
-    return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
deleted file mode 100644
index c431cb0..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.metrics.Measurements;
-import org.apache.gobblin.metrics.Metric;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
-
-
-@Test(groups = { "gobblin.metrics" })
-public class KafkaReporterTest {
-
-  public KafkaReporterTest() throws IOException, InterruptedException {}
-
-  /**
-   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
-   * @return KafkaReporter builder
-   */
-  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) {
-    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
-  }
-
-  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) {
-    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
-  }
-
-  @Test
-  public void testKafkaReporter() throws IOException {
-    MetricContext metricContext =
-        MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build();
-    Counter counter = metricContext.counter("com.linkedin.example.counter");
-    Meter meter = metricContext.meter("com.linkedin.example.meter");
-    Histogram histogram = metricContext.histogram("com.linkedin.example.histogram");
-
-    MockKafkaPusher pusher = new MockKafkaPusher();
-    KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties());
-
-    counter.inc();
-    meter.mark(2);
-    histogram.update(1);
-    histogram.update(1);
-    histogram.update(2);
-
-    kafkaReporter.report(metricContext);
-
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    Map<String, Double> expected = new HashMap<>();
-    expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0);
-    expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0);
-    expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0);
-
-    MetricReport nextReport = nextReport(pusher.messageIterator());
-
-    expectMetricsWithValues(nextReport, expected);
-
-    kafkaReporter.report(metricContext);
-
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    Set<String> expectedSet = new HashSet<>();
-    expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT);
-    expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT);
-    expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
-    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
-    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
-    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
-    expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
-
-    nextReport = nextReport(pusher.messageIterator());
-    expectMetrics(nextReport, expectedSet, true);
-
-    kafkaReporter.close();
-
-  }
-
-  @Test
-  public void kafkaReporterTagsTest() throws IOException {
-    MetricContext metricContext =
-        MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
-    Counter counter = metricContext.counter("com.linkedin.example.counter");
-
-    Tag<?> tag1 = new Tag<>("tag1", "value1");
-    Tag<?> tag2 = new Tag<>("tag2", 2);
-
-    MockKafkaPusher pusher = new MockKafkaPusher();
-    KafkaReporter kafkaReporter =
-        getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties());
-
-    counter.inc();
-
-    kafkaReporter.report(metricContext);
-
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    MetricReport metricReport = nextReport(pusher.messageIterator());
-
-    Assert.assertEquals(4, metricReport.getTags().size());
-    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
-    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
-    Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey()));
-    Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
-  }
-
-  @Test
-  public void kafkaReporterContextTest() throws IOException {
-    Tag<?> tag1 = new Tag<>("tag1", "value1");
-    MetricContext context = MetricContext.builder("context").addTag(tag1).build();
-    Counter counter = context.counter("com.linkedin.example.counter");
-
-    MockKafkaPusher pusher = new MockKafkaPusher();
-    KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties());
-
-    counter.inc();
-
-    kafkaReporter.report(context);
-
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    MetricReport metricReport = nextReport(pusher.messageIterator());
-
-    Assert.assertEquals(3, metricReport.getTags().size());
-    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
-    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
-
-  }
-
-  /**
-   * Expect a list of metrics with specific values.
-   * Fail if not all metrics are received, or some metric has the wrong value.
-   * @param report MetricReport.
-   * @param expected map of expected metric names and their values
-   * @throws IOException
-   */
-  private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException {
-    List<Metric> metricIterator = report.getMetrics();
-
-    for (Metric metric : metricIterator) {
-      if (expected.containsKey(metric.getName())) {
-        Assert.assertEquals(expected.get(metric.getName()), metric.getValue());
-        expected.remove(metric.getName());
-      }
-    }
-
-    Assert.assertTrue(expected.isEmpty());
-
-  }
-
-  /**
-   * Expect a set of metric names. Will fail if not all of these metrics are received.
-   * @param report MetricReport
-   * @param expected set of expected metric names
-   * @param strict if set to true, will fail if receiving any metric that is not expected
-   * @throws IOException
-   */
-  private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException {
-    List<Metric> metricIterator = report.getMetrics();
-    for (Metric metric : metricIterator) {
-      //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name));
-      if (expected.contains(metric.getName())) {
-        expected.remove(metric.getName());
-      } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) {
-        Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
-      }
-    }
-    Assert.assertTrue(expected.isEmpty());
-  }
-
-  /**
-   * Extract the next metric from the Kafka iterator
-   * Assumes existence of the metric has already been checked.
-   * @param it Kafka ConsumerIterator
-   * @return next metric in the stream
-   * @throws IOException
-   */
-  protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
-    Assert.assertTrue(it.hasNext());
-    return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..3d2de9b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public class KafkaProducerPusher implements Pusher {
+
+  private final String topic;
+  private final KafkaProducer<String, byte[]> producer;
+  private final Closer closer;
+
+  public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+    // add the kafka scoped config. if any of the above are specified then they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all byte array messages to the Kafka topic.
+   * @param messages List of byte array messages to push to Kakfa.
+   */
+  public void pushMessages(List<byte[]> messages) {
+    for (byte[] message: messages) {
+      this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<String, byte[]>(props));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..723f8b7
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+import kafka.consumer.ConsumerIterator;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+  public static final String TOPIC = KafkaProducerPusherTest.class.getSimpleName();
+
+  private KafkaTestBase kafkaTestHelper;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+
+    kafkaTestHelper.provisionTopic(TOPIC);
+  }
+
+  @Test
+  public void test() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+    String msg1 = "msg1";
+    String msg2 = "msg2";
+
+    pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    assert(iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg1);
+    assert(iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+    pusher.close();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      this.kafkaTestHelper.close();
+    } catch(Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
new file mode 100644
index 0000000..a0af52a
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+
+
+/**
+ * Kafka reporting formats enumeration.
+ */
+public enum KafkaReportingFormats {
+
+  AVRO,
+  JSON;
+
+  /**
+   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format.
+   *
+   * @param properties {@link Properties} containing information to build reporters.
+   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
+   */
+  public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) {
+    switch (this) {
+      case AVRO:
+        KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
+        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+        }
+        return builder;
+      case JSON:
+        return KafkaReporter.BuilderFactory.newBuilder();
+      default:
+        // This should never happen.
+        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+    }
+  }
+
+  /**
+   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format.
+   * @param context {@link MetricContext} that should be reported.
+   * @param properties {@link Properties} containing information to build reporters.
+   * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
+   */
+  public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) {
+    switch (this) {
+      case AVRO:
+        KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context);
+        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+        }
+        return builder;
+      case JSON:
+        return KafkaEventReporter.Factory.forContext(context);
+      default:
+        // This should never happen.
+        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
new file mode 100644
index 0000000..ecb5d7d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+
+
+/**
+ * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records.
+ */
+public class KafkaAvroEventReporter extends KafkaEventReporter {
+
+  protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
+    super(builder);
+    if(builder.registry.isPresent()) {
+      Schema schema =
+          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
+          Optional.of(schema)));
+    }
+  }
+
+  @Override
+  protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter)
+      throws IOException {
+    return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
+  }
+
+  /**
+   * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}.
+   *
+   * @param context the {@link MetricContext} to report
+   * @return KafkaAvroReporter builder
+   * @deprecated this method is bugged. Use {@link Factory#forContext} instead.
+   */
+  @Deprecated
+  public static Builder<? extends Builder<?>> forContext(MetricContext context) {
+    return new BuilderImpl(context);
+  }
+
+  private static class BuilderImpl extends Builder<BuilderImpl> {
+    private BuilderImpl(MetricContext context) {
+      super(context);
+    }
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  public static abstract class Factory {
+    /**
+     * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}.
+     *
+     * @param context the {@link MetricContext} to report
+     * @return KafkaAvroReporter builder
+     */
+    public static BuilderImpl forContext(MetricContext context) {
+      return new BuilderImpl(context);
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaAvroEventReporter}.
+   * Defaults to no filter, reporting rates in seconds and times in milliseconds.
+   */
+  public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
+
+    private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+
+    private Builder(MetricContext context) {
+      super(context);
+    }
+
+    public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
+      this.registry = Optional.of(registry);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaAvroEventReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaAvroReporter
+     */
+    public KafkaAvroEventReporter build(String brokers, String topic) throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KafkaAvroEventReporter(this);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
new file mode 100644
index 0000000..4b6399b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Kafka reporter for codahale metrics writing metrics in Avro format.
+ *
+ * @author ibuenros
+ */
+public class KafkaAvroReporter extends KafkaReporter {
+
+  protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
+    super(builder, config);
+    if (builder.registry.isPresent()) {
+      Schema schema =
+          new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
+      this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
+          Optional.of(schema)));
+    }
+  }
+
+  @Override
+  protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter)
+      throws IOException {
+    return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
+  }
+
+  /**
+   * A static factory class for obtaining new {@link Builder}s
+   *
+   * @see Builder
+   */
+  public static class BuilderFactory {
+
+    public static BuilderImpl newBuilder() {
+      return new BuilderImpl();
+    }
+  }
+
+  public static class BuilderImpl extends Builder<BuilderImpl> {
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
+   */
+  public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
+
+    private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+
+    public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
+      this.registry = Optional.of(registry);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaAvroReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaAvroReporter
+     */
+    public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 4c155fb..6162636 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -31,7 +31,6 @@ import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +39,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.apache.gobblin.util.AvroUtils;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
new file mode 100644
index 0000000..b15e96e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+
+
+/**
+ * Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
+ */
+public class KafkaEventReporter extends EventReporter {
+
+  protected final AvroSerializer<GobblinTrackingEvent> serializer;
+  private final Pusher kafkaPusher;
+
+  public KafkaEventReporter(Builder<?> builder) throws IOException {
+    super(builder);
+
+    this.serializer = this.closer.register(
+        createSerializer(new FixedSchemaVersionWriter()));
+
+    if(builder.kafkaPusher.isPresent()) {
+      this.kafkaPusher = builder.kafkaPusher.get();
+    } else {
+        String pusherClassName = builder.pusherClassName.or(PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+        this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, builder.config);
+    }
+  }
+
+  @Override
+  public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    GobblinTrackingEvent nextEvent;
+    List<byte[]> events = Lists.newArrayList();
+
+    while(null != (nextEvent = queue.poll())) {
+      events.add(this.serializer.serializeRecord(nextEvent));
+    }
+
+    if (!events.isEmpty()) {
+      this.kafkaPusher.pushMessages(events);
+    }
+
+  }
+
+  protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
+    return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
+  }
+
+  /**
+   * Returns a new {@link Builder} for {@link KafkaEventReporter}.
+   * Will automatically add all Context tags to the reporter.
+   *
+   * @param context the {@link MetricContext} to report
+   * @return KafkaReporter builder
+   * @deprecated this method is bugged. Use {@link Factory#forContext} instead.
+   */
+  @Deprecated
+  public static Builder<? extends Builder> forContext(MetricContext context) {
+    return new BuilderImpl(context);
+  }
+
+  public static class BuilderImpl extends Builder<BuilderImpl> {
+    private BuilderImpl(MetricContext context) {
+      super(context);
+    }
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  public static class Factory {
+    /**
+     * Returns a new {@link Builder} for {@link KafkaEventReporter}.
+     * Will automatically add all Context tags to the reporter.
+     *
+     * @param context the {@link MetricContext} to report
+     * @return KafkaReporter builder
+     */
+    public static BuilderImpl forContext(MetricContext context) {
+      return new BuilderImpl(context);
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaEventReporter}.
+   * Defaults to no filter, reporting rates in seconds and times in milliseconds.
+   */
+  public static abstract class Builder<T extends EventReporter.Builder<T>>
+      extends EventReporter.Builder<T> {
+    protected String brokers;
+    protected String topic;
+    protected Optional<Pusher> kafkaPusher;
+    protected Optional<Config> config = Optional.absent();
+    protected Optional<String> pusherClassName = Optional.absent();
+
+    protected Builder(MetricContext context) {
+      super(context);
+      this.kafkaPusher = Optional.absent();
+    }
+
+    /**
+     * Set {@link Pusher} to use.
+     */
+    public T withKafkaPusher(Pusher pusher) {
+      this.kafkaPusher = Optional.of(pusher);
+      return self();
+    }
+
+    /**
+     * Set additional configuration.
+     */
+    public T withConfig(Config config) {
+      this.config = Optional.of(config);
+      return self();
+    }
+
+    /**
+     * Set a {@link Pusher} class name
+     */
+    public T withPusherClassName(String pusherClassName) {
+      this.pusherClassName = Optional.of(pusherClassName);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaEventReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaReporter
+     */
+    public KafkaEventReporter build(String brokers, String topic) throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KafkaEventReporter(this);
+    }
+
+  }
+}