You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/27 19:00:50 UTC
[1/2] incubator-gobblin git commit: [GOBBLIN-298] Add metric and
event reporters that emit using a KafkaProducer
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 90d8495ae -> ee770f5c5
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
new file mode 100644
index 0000000..1c935b4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Kafka reporter for metrics.
+ *
+ * @author ibuenros
+ */
+@Slf4j
+public class KafkaReporter extends MetricReportReporter {
+ public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
+ private static final String METRICS_KAFKA_PREFIX = "metrics.kafka";
+
+ protected final AvroSerializer<MetricReport> serializer;
+ protected final Pusher kafkaPusher;
+
+ protected KafkaReporter(Builder<?> builder, Config config) throws IOException {
+ super(builder, config);
+
+ SchemaVersionWriter versionWriter;
+ if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
+ try {
+ ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class);
+ Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE));
+ versionWriter = klazz.newInstance();
+ } catch (ReflectiveOperationException roe) {
+ throw new IOException("Could not instantiate version writer.", roe);
+ }
+ } else {
+ versionWriter = new FixedSchemaVersionWriter();
+ }
+
+ log.info("Schema version writer: " + versionWriter.getClass().getName());
+ this.serializer = this.closer.register(createSerializer(versionWriter));
+
+ if (builder.kafkaPusher.isPresent()) {
+ this.kafkaPusher = builder.kafkaPusher.get();
+ } else {
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+ String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+ PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+
+ this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, Optional.of(kafkaConfig));
+ }
+ }
+
+ protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
+ return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
+ }
+
+ /**
+ * A static factory class for obtaining new {@link Builder}s
+ *
+ * @see Builder
+ */
+ public static class BuilderFactory {
+
+ public static BuilderImpl newBuilder() {
+ return new BuilderImpl();
+ }
+ }
+
+ public static class BuilderImpl extends Builder<BuilderImpl> {
+
+ @Override
+ protected BuilderImpl self() {
+ return this;
+ }
+ }
+
+ /**
+ * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
+ */
+ public static abstract class Builder<T extends MetricReportReporter.Builder<T>>
+ extends MetricReportReporter.Builder<T> {
+
+ protected String brokers;
+ protected String topic;
+ protected Optional<Pusher> kafkaPusher;
+
+ protected Builder() {
+ super();
+ this.name = "KafkaReporter";
+ this.kafkaPusher = Optional.absent();
+ }
+
+ /**
+ * Set {@link Pusher} to use.
+ */
+ public T withKafkaPusher(Pusher pusher) {
+ this.kafkaPusher = Optional.of(pusher);
+ return self();
+ }
+
+ /**
+ * Builds and returns {@link KafkaReporter}.
+ *
+ * @param brokers string of Kafka brokers
+ * @param topic topic to send metrics to
+ * @return KafkaReporter
+ */
+ public KafkaReporter build(String brokers, String topic, Properties props) throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+
+ return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+ }
+ }
+
+ @Override
+ protected void emitReport(MetricReport report) {
+ this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
new file mode 100644
index 0000000..9faac33
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
+import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+ @Override
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
+ throws IOException {
+ if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
+ return null;
+ }
+ log.info("Reporting metrics to Kafka");
+
+ Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+ Optional<String> metricsTopic = Optional.fromNullable(
+ properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+ Optional<String> eventsTopic = Optional.fromNullable(
+ properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+
+ boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
+ if (metricsEnabled) log.info("Reporting metrics to Kafka");
+ boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
+ if (eventsEnabled) log.info("Reporting events to Kafka");
+
+ try {
+ Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
+ "Kafka metrics brokers missing.");
+ Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
+ } catch (IllegalArgumentException exception) {
+ log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
+ return null;
+ }
+
+ String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
+
+ String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
+
+ KafkaReportingFormats formatEnum;
+ try {
+ formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
+ } catch (IllegalArgumentException exception) {
+ log.warn("Kafka metrics reporting format " + reportingFormat +
+ " not recognized. Will report in json format.", exception);
+ formatEnum = KafkaReportingFormats.JSON;
+ }
+
+ if (metricsEnabled) {
+ try {
+ formatEnum.metricReporterBuilder(properties)
+ .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
+ } catch (IOException exception) {
+ log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
+ }
+ }
+
+ if (eventsEnabled) {
+ try {
+ KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
+ properties);
+
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties),
+ PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+ builder.withConfig(kafkaConfig);
+
+ builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+ PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME));
+
+ return builder.build(brokers, eventsTopic.or(defaultTopic).get());
+ } catch (IOException exception) {
+ log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
+ }
+ }
+
+ log.info("Will start reporting metrics to Kafka");
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
new file mode 100644
index 0000000..5abd503
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.Closeable;
+import java.util.List;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public interface Pusher extends Closeable {
+ /**
+ * Push all byte array messages to the Kafka topic.
+ * @param messages List of byte array messages to push to Kakfa.
+ */
+ void pushMessages(List<byte[]> messages);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
new file mode 100644
index 0000000..a76c750
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+public class PusherUtils {
+ public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = "metrics.reporting.kafka.config";
+ public static final String KAFKA_PUSHER_CLASS_NAME_KEY = "metrics.reporting.kafkaPusherClass";
+ public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = "org.apache.gobblin.metrics.kafka.KafkaPusher";
+
+ /**
+ * Create a {@link Pusher}
+ * @param pusherClassName the {@link Pusher} class to instantiate
+ * @param brokers brokers to connect to
+ * @param topic the topic to write to
+ * @param config additional configuration for configuring the {@link Pusher}
+ * @return a {@link Pusher}
+ */
+ public static Pusher getPusher(String pusherClassName, String brokers, String topic, Optional<Config> config) {
+ try {
+ Class<?> pusherClass = Class.forName(pusherClassName);
+
+ return (Pusher) GobblinConstructorUtils.invokeLongestConstructor(pusherClass,
+ brokers, topic, config);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Could not instantiate kafka pusher", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
new file mode 100644
index 0000000..e240a53
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
+
+ @Override
+ public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
+ Pusher pusher) {
+ return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
+ throws IOException {
+ Assert.assertTrue(it.hasNext());
+ return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
new file mode 100644
index 0000000..e7be31d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+/**
+ * Test for KafkaAvroReporter
+ * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer
+ *
+ * @author ibuenros
+ */
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroReporterTest extends KafkaReporterTest {
+
+ public KafkaAvroReporterTest(String topic)
+ throws IOException, InterruptedException {
+ super();
+ }
+
+ public KafkaAvroReporterTest() throws IOException, InterruptedException {
+ this("KafkaAvroReporterTest");
+ }
+
+ @Override
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+ return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+ }
+
+ @Override
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+ return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected MetricReport nextReport(Iterator<byte[]> it)
+ throws IOException {
+ Assert.assertTrue(it.hasNext());
+ return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
new file mode 100644
index 0000000..add42f4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaEventReporterTest {
+
+ /**
+ * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+ * @param context metricregistry
+ * @return KafkaReporter builder
+ */
+ public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
+ Pusher pusher) {
+ return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher);
+ }
+
+
+ @Test
+ public void testKafkaEventReporter() throws IOException {
+ MetricContext context = MetricContext.builder("context").build();
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+ String namespace = "gobblin.metrics.test";
+ String eventName = "testEvent";
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent();
+ event.setName(eventName);
+ event.setNamespace(namespace);
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("m1", "v1");
+ metadata.put("m2", null);
+ event.setMetadata(metadata);
+ context.submitEvent(event);
+
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ kafkaReporter.report();
+
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+ Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+ Assert.assertEquals(retrievedEvent.getName(), eventName);
+ Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+
+ }
+
+ @Test
+ public void testTagInjection() throws IOException {
+
+ String tag1 = "tag1";
+ String value1 = "value1";
+ String metadataValue1 = "metadata1";
+ String tag2 = "tag2";
+ String value2 = "value2";
+
+ MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)).
+ addTag(new Tag<String>(tag2, value2)).build();
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+ String namespace = "gobblin.metrics.test";
+ String eventName = "testEvent";
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent();
+ event.setName(eventName);
+ event.setNamespace(namespace);
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(tag1, metadataValue1);
+ event.setMetadata(metadata);
+ context.submitEvent(event);
+
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ kafkaReporter.report();
+
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+ Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+ Assert.assertEquals(retrievedEvent.getName(), eventName);
+ Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+ Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1);
+ Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
+ }
+
+ /**
+ * Extract the next metric from the Kafka iterator
+ * Assumes existence of the metric has already been checked.
+ * @param it Kafka ConsumerIterator
+ * @return next metric in the stream
+ * @throws IOException
+ */
+ protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException {
+ Assert.assertTrue(it.hasNext());
+ return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
new file mode 100644
index 0000000..f653dd9
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.metrics.Measurements;
+import org.apache.gobblin.metrics.Metric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+@Test(groups = { "gobblin.metrics" })
+public class KafkaReporterTest {
+
+ public KafkaReporterTest() throws IOException, InterruptedException {}
+
+ /**
+ * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+ * @return KafkaReporter builder
+ */
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+ return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+ }
+
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+ return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+ }
+
+ @Test
+ public void testKafkaReporter() throws IOException {
+ MetricContext metricContext =
+ MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build();
+ Counter counter = metricContext.counter("com.linkedin.example.counter");
+ Meter meter = metricContext.meter("com.linkedin.example.meter");
+ Histogram histogram = metricContext.histogram("com.linkedin.example.histogram");
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties());
+
+ counter.inc();
+ meter.mark(2);
+ histogram.update(1);
+ histogram.update(1);
+ histogram.update(2);
+
+ kafkaReporter.report(metricContext);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Map<String, Double> expected = new HashMap<>();
+ expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0);
+ expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0);
+ expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0);
+
+ MetricReport nextReport = nextReport(pusher.messageIterator());
+
+ expectMetricsWithValues(nextReport, expected);
+
+ kafkaReporter.report(metricContext);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Set<String> expectedSet = new HashSet<>();
+ expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT);
+ expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT);
+ expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
+ expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
+ expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
+ expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
+ expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
+
+ nextReport = nextReport(pusher.messageIterator());
+ expectMetrics(nextReport, expectedSet, true);
+
+ kafkaReporter.close();
+
+ }
+
+ @Test
+ public void kafkaReporterTagsTest() throws IOException {
+ MetricContext metricContext =
+ MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
+ Counter counter = metricContext.counter("com.linkedin.example.counter");
+
+ Tag<?> tag1 = new Tag<>("tag1", "value1");
+ Tag<?> tag2 = new Tag<>("tag2", 2);
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaReporter kafkaReporter =
+ getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties());
+
+ counter.inc();
+
+ kafkaReporter.report(metricContext);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MetricReport metricReport = nextReport(pusher.messageIterator());
+
+ Assert.assertEquals(4, metricReport.getTags().size());
+ Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+ Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+ Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey()));
+ Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
+ }
+
+ @Test
+ public void kafkaReporterContextTest() throws IOException {
+ Tag<?> tag1 = new Tag<>("tag1", "value1");
+ MetricContext context = MetricContext.builder("context").addTag(tag1).build();
+ Counter counter = context.counter("com.linkedin.example.counter");
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties());
+
+ counter.inc();
+
+ kafkaReporter.report(context);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MetricReport metricReport = nextReport(pusher.messageIterator());
+
+ Assert.assertEquals(3, metricReport.getTags().size());
+ Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+ Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+
+ }
+
+ /**
+ * Expect a list of metrics with specific values.
+ * Fail if not all metrics are received, or some metric has the wrong value.
+ * @param report MetricReport.
+ * @param expected map of expected metric names and their values
+ * @throws IOException
+ */
+ private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException {
+ List<Metric> metricIterator = report.getMetrics();
+
+ for (Metric metric : metricIterator) {
+ if (expected.containsKey(metric.getName())) {
+ Assert.assertEquals(expected.get(metric.getName()), metric.getValue());
+ expected.remove(metric.getName());
+ }
+ }
+
+ Assert.assertTrue(expected.isEmpty());
+
+ }
+
+ /**
+ * Expect a set of metric names. Will fail if not all of these metrics are received.
+ * @param report MetricReport
+ * @param expected set of expected metric names
+ * @param strict if set to true, will fail if receiving any metric that is not expected
+ * @throws IOException
+ */
+ private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException {
+ List<Metric> metricIterator = report.getMetrics();
+ for (Metric metric : metricIterator) {
+ //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name));
+ if (expected.contains(metric.getName())) {
+ expected.remove(metric.getName());
+ } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) {
+ Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
+ }
+ }
+ Assert.assertTrue(expected.isEmpty());
+ }
+
+ /**
+ * Extract the next metric from the Kafka iterator
+ * Assumes existence of the metric has already been checked.
+ * @param it Kafka ConsumerIterator
+ * @return next metric in the stream
+ * @throws IOException
+ */
+ protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
+ Assert.assertTrue(it.hasNext());
+ return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
new file mode 100644
index 0000000..71decbb
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.collect.Queues;
+
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing.
+ */
+public class MockKafkaPusher implements Pusher {
+
+ Queue<byte[]> messages = Queues.newLinkedBlockingQueue();
+
+ public MockKafkaPusher() {
+ }
+
+ @Override
+ public void pushMessages(List<byte[]> messages) {
+ this.messages.addAll(messages);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+
+ public Iterator<byte[]> messageIterator() {
+ return this.messages.iterator();
+ }
+
+}
[2/2] incubator-gobblin git commit: [GOBBLIN-298] Add metric and
event reporters that emit using a KafkaProducer
Posted by hu...@apache.org.
[GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer
Closes #2153 from htran1/metrics09
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ee770f5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ee770f5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ee770f5c
Branch: refs/heads/master
Commit: ee770f5c5aeec469d7d93c016ce0a25200932eb2
Parents: 90d8495
Author: Hung Tran <hu...@linkedin.com>
Authored: Fri Oct 27 12:00:41 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Oct 27 12:00:41 2017 -0700
----------------------------------------------------------------------
.../gobblin/metrics/KafkaReportingFormats.java | 83 -------
.../metrics/kafka/KafkaAvroEventReporter.java | 122 ----------
.../metrics/kafka/KafkaAvroReporter.java | 105 --------
.../metrics/kafka/KafkaEventReporter.java | 151 ------------
.../metrics/kafka/KafkaProducerPusher.java | 91 +++++++
.../gobblin/metrics/kafka/KafkaPusher.java | 3 +-
.../gobblin/metrics/kafka/KafkaReporter.java | 147 -----------
.../metrics/kafka/KafkaReporterFactory.java | 104 --------
.../reporter/KafkaAvroEventReporterTest.java | 50 ----
.../metrics/reporter/KafkaAvroReporterTest.java | 68 ------
.../reporter/KafkaEventReporterTest.java | 150 ------------
.../metrics/reporter/KafkaReporterTest.java | 242 -------------------
.../metrics/kafka/KafkaProducerPusher.java | 91 +++++++
.../reporter/KafkaProducerPusherTest.java | 91 +++++++
.../gobblin/metrics/KafkaReportingFormats.java | 83 +++++++
.../metrics/kafka/KafkaAvroEventReporter.java | 122 ++++++++++
.../metrics/kafka/KafkaAvroReporter.java | 105 ++++++++
.../metrics/kafka/KafkaAvroSchemaRegistry.java | 2 +-
.../metrics/kafka/KafkaEventReporter.java | 170 +++++++++++++
.../gobblin/metrics/kafka/KafkaReporter.java | 150 ++++++++++++
.../metrics/kafka/KafkaReporterFactory.java | 113 +++++++++
.../apache/gobblin/metrics/kafka/Pusher.java | 33 +++
.../gobblin/metrics/kafka/PusherUtils.java | 47 ++++
.../reporter/KafkaAvroEventReporterTest.java | 50 ++++
.../metrics/reporter/KafkaAvroReporterTest.java | 67 +++++
.../reporter/KafkaEventReporterTest.java | 150 ++++++++++++
.../metrics/reporter/KafkaReporterTest.java | 240 ++++++++++++++++++
.../metrics/reporter/MockKafkaPusher.java | 55 +++++
28 files changed, 1660 insertions(+), 1225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
deleted file mode 100644
index 5f47121..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics;
-
-import java.util.Properties;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
-import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-
-
-/**
- * Kafka reporting formats enumeration.
- */
-public enum KafkaReportingFormats {
-
- AVRO,
- JSON;
-
- /**
- * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format.
- *
- * @param properties {@link java.util.Properties} containing information to build reporters.
- * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
- */
- public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) {
- switch (this) {
- case AVRO:
- KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
- if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
- ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
- builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
- }
- return builder;
- case JSON:
- return KafkaReporter.BuilderFactory.newBuilder();
- default:
- // This should never happen.
- throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
- }
- }
-
- /**
- * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format.
- * @param context {@link org.apache.gobblin.metrics.MetricContext} that should be reported.
- * @param properties {@link java.util.Properties} containing information to build reporters.
- * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
- */
- public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) {
- switch (this) {
- case AVRO:
- KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context);
- if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
- ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
- builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
- }
- return builder;
- case JSON:
- return KafkaEventReporter.Factory.forContext(context);
- default:
- // This should never happen.
- throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
deleted file mode 100644
index 5d35c87..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-
-
-/**
- * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records.
- */
-public class KafkaAvroEventReporter extends KafkaEventReporter {
-
- protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
- super(builder);
- if(builder.registry.isPresent()) {
- Schema schema =
- new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
- this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
- Optional.of(schema)));
- }
- }
-
- @Override
- protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter)
- throws IOException {
- return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
- }
-
- /**
- * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}.
- *
- * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
- * @return KafkaAvroReporter builder
- * @deprecated this method is bugged. Use {@link KafkaAvroEventReporter.Factory#forContext} instead.
- */
- @Deprecated
- public static Builder<? extends Builder<?>> forContext(MetricContext context) {
- return new BuilderImpl(context);
- }
-
- private static class BuilderImpl extends Builder<BuilderImpl> {
- private BuilderImpl(MetricContext context) {
- super(context);
- }
-
- @Override
- protected BuilderImpl self() {
- return this;
- }
- }
-
- public static abstract class Factory {
- /**
- * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}.
- *
- * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
- * @return KafkaAvroReporter builder
- */
- public static KafkaAvroEventReporter.BuilderImpl forContext(MetricContext context) {
- return new BuilderImpl(context);
- }
- }
-
- /**
- * Builder for {@link KafkaAvroEventReporter}.
- * Defaults to no filter, reporting rates in seconds and times in milliseconds.
- */
- public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
-
- private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
-
- private Builder(MetricContext context) {
- super(context);
- }
-
- public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
- this.registry = Optional.of(registry);
- return self();
- }
-
- /**
- * Builds and returns {@link KafkaAvroEventReporter}.
- *
- * @param brokers string of Kafka brokers
- * @param topic topic to send metrics to
- * @return KafkaAvroReporter
- */
- public KafkaAvroEventReporter build(String brokers, String topic) throws IOException {
- this.brokers = brokers;
- this.topic = topic;
- return new KafkaAvroEventReporter(this);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
deleted file mode 100644
index 35d558e..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ConfigUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-
-
-/**
- * Kafka reporter for codahale metrics writing metrics in Avro format.
- *
- * @author ibuenros
- */
-public class KafkaAvroReporter extends KafkaReporter {
-
- protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
- super(builder, config);
- if (builder.registry.isPresent()) {
- Schema schema =
- new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
- this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
- Optional.of(schema)));
- }
- }
-
- @Override
- protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter)
- throws IOException {
- return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
- }
-
- /**
- * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder}s
- *
- * @see org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder
- */
- public static class BuilderFactory {
-
- public static BuilderImpl newBuilder() {
- return new BuilderImpl();
- }
- }
-
- public static class BuilderImpl extends Builder<BuilderImpl> {
-
- @Override
- protected BuilderImpl self() {
- return this;
- }
- }
-
- /**
- * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
- */
- public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
-
- private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
-
- public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
- this.registry = Optional.of(registry);
- return self();
- }
-
- /**
- * Builds and returns {@link KafkaAvroReporter}.
- *
- * @param brokers string of Kafka brokers
- * @param topic topic to send metrics to
- * @return KafkaAvroReporter
- */
- public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
- this.brokers = brokers;
- this.topic = topic;
- return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
deleted file mode 100644
index c23294a..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.EventReporter;
-import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-
-
-/**
- * Reports {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to a Kafka topic serialized as JSON.
- */
-public class KafkaEventReporter extends EventReporter {
-
- protected final AvroSerializer<GobblinTrackingEvent> serializer;
- private final KafkaPusher kafkaPusher;
-
- public KafkaEventReporter(Builder<?> builder) throws IOException {
- super(builder);
-
- this.serializer = this.closer.register(
- createSerializer(new FixedSchemaVersionWriter()));
-
- if(builder.kafkaPusher.isPresent()) {
- this.kafkaPusher = builder.kafkaPusher.get();
- } else {
- this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
- }
-
- }
-
- @Override
- public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
- GobblinTrackingEvent nextEvent;
- List<byte[]> events = Lists.newArrayList();
-
- while(null != (nextEvent = queue.poll())) {
- events.add(this.serializer.serializeRecord(nextEvent));
- }
-
- if (!events.isEmpty()) {
- this.kafkaPusher.pushMessages(events);
- }
-
- }
-
- protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
- return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
- }
-
- /**
- * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}.
- * Will automatically add all Context tags to the reporter.
- *
- * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
- * @return KafkaReporter builder
- * @deprecated this method is bugged. Use {@link KafkaEventReporter.Factory#forContext} instead.
- */
- @Deprecated
- public static Builder<? extends Builder> forContext(MetricContext context) {
- return new BuilderImpl(context);
- }
-
- public static class BuilderImpl extends Builder<BuilderImpl> {
- private BuilderImpl(MetricContext context) {
- super(context);
- }
-
- @Override
- protected BuilderImpl self() {
- return this;
- }
- }
-
- public static class Factory {
- /**
- * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}.
- * Will automatically add all Context tags to the reporter.
- *
- * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report
- * @return KafkaReporter builder
- */
- public static BuilderImpl forContext(MetricContext context) {
- return new BuilderImpl(context);
- }
- }
-
- /**
- * Builder for {@link KafkaEventReporter}.
- * Defaults to no filter, reporting rates in seconds and times in milliseconds.
- */
- public static abstract class Builder<T extends EventReporter.Builder<T>>
- extends EventReporter.Builder<T> {
- protected String brokers;
- protected String topic;
- protected Optional<KafkaPusher> kafkaPusher;
-
- protected Builder(MetricContext context) {
- super(context);
- this.kafkaPusher = Optional.absent();
- }
-
- /**
- * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use.
- */
- public T withKafkaPusher(KafkaPusher pusher) {
- this.kafkaPusher = Optional.of(pusher);
- return self();
- }
-
- /**
- * Builds and returns {@link KafkaEventReporter}.
- *
- * @param brokers string of Kafka brokers
- * @param topic topic to send metrics to
- * @return KafkaReporter
- */
- public KafkaEventReporter build(String brokers, String topic) throws IOException {
- this.brokers = brokers;
- this.topic = topic;
- return new KafkaEventReporter(this);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..ff75a92
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public class KafkaProducerPusher implements Pusher {
+
+ private final String topic;
+ private final KafkaProducer<String, byte[]> producer;
+ private final Closer closer;
+
+ public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+ this.closer = Closer.create();
+
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+ // add the kafka scoped config. if any of the above are specified then they are overridden
+ if (kafkaConfig.isPresent()) {
+ props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ }
+
+ this.producer = createProducer(props);
+ }
+
+ public KafkaProducerPusher(String brokers, String topic) {
+ this(brokers, topic, Optional.absent());
+ }
+
+ /**
+ * Push all byte array messages to the Kafka topic.
+ * @param messages List of byte array messages to push to Kakfa.
+ */
+ public void pushMessages(List<byte[]> messages) {
+ for (byte[] message: messages) {
+ this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ this.closer.close();
+ }
+
+ /**
+ * Create the Kafka producer.
+ */
+ protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+ return this.closer.register(new KafkaProducer<String, byte[]>(props));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
index 29162ac..1c977ff 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.metrics.kafka;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
@@ -34,7 +33,7 @@ import kafka.producer.ProducerConfig;
/**
* Establishes a connection to a Kafka cluster and pushed byte messages to a specified topic.
*/
-public class KafkaPusher implements Closeable {
+public class KafkaPusher implements Pusher {
private final String topic;
private final ProducerCloseable<String, byte[]> producer;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
deleted file mode 100644
index 2aa0e97..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.MetricReportReporter;
-import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * Kafka reporter for metrics.
- *
- * @author ibuenros
- */
-@Slf4j
-public class KafkaReporter extends MetricReportReporter {
-
- public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
-
- protected final AvroSerializer<MetricReport> serializer;
- protected final KafkaPusher kafkaPusher;
-
-
- protected KafkaReporter(Builder<?> builder, Config config) throws IOException {
- super(builder, config);
-
- SchemaVersionWriter versionWriter;
- if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
- try {
- ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class);
- Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE));
- versionWriter = klazz.newInstance();
- } catch (ReflectiveOperationException roe) {
- throw new IOException("Could not instantiate version writer.", roe);
- }
- } else {
- versionWriter = new FixedSchemaVersionWriter();
- }
-
- log.info("Schema version writer: " + versionWriter.getClass().getName());
- this.serializer = this.closer.register(createSerializer(versionWriter));
-
- if (builder.kafkaPusher.isPresent()) {
- this.kafkaPusher = builder.kafkaPusher.get();
- } else {
- this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic));
- }
- }
-
- protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
- return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
- }
-
- /**
- * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}s
- *
- * @see org.apache.gobblin.metrics.kafka.KafkaReporter.Builder
- */
- public static class BuilderFactory {
-
- public static BuilderImpl newBuilder() {
- return new BuilderImpl();
- }
- }
-
- public static class BuilderImpl extends Builder<BuilderImpl> {
-
- @Override
- protected BuilderImpl self() {
- return this;
- }
- }
-
- /**
- * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
- */
- public static abstract class Builder<T extends MetricReportReporter.Builder<T>>
- extends MetricReportReporter.Builder<T> {
-
- protected String brokers;
- protected String topic;
- protected Optional<KafkaPusher> kafkaPusher;
-
- protected Builder() {
- super();
- this.name = "KafkaReporter";
- this.kafkaPusher = Optional.absent();
- }
-
- /**
- * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use.
- */
- public T withKafkaPusher(KafkaPusher pusher) {
- this.kafkaPusher = Optional.of(pusher);
- return self();
- }
-
- /**
- * Builds and returns {@link KafkaReporter}.
- *
- * @param brokers string of Kafka brokers
- * @param topic topic to send metrics to
- * @return KafkaReporter
- */
- public KafkaReporter build(String brokers, String topic, Properties props) throws IOException {
- this.brokers = brokers;
- this.topic = topic;
-
- return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
- }
- }
-
- @Override
- protected void emitReport(MetricReport report) {
- this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
deleted file mode 100644
index 328a47b..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
-import org.apache.gobblin.metrics.KafkaReportingFormats;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
-
-
-@Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
- @Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
- throws IOException {
- if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
- ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
- return null;
- }
- log.info("Reporting metrics to Kafka");
-
- Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
- Optional<String> metricsTopic = Optional.fromNullable(
- properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
- Optional<String> eventsTopic = Optional.fromNullable(
- properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
-
- boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
- if (metricsEnabled) log.info("Reporting metrics to Kafka");
- boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
- if (eventsEnabled) log.info("Reporting events to Kafka");
-
- try {
- Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
- "Kafka metrics brokers missing.");
- Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
- } catch (IllegalArgumentException exception) {
- log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
- return null;
- }
-
- String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
-
- String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
- ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
-
- KafkaReportingFormats formatEnum;
- try {
- formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
- } catch (IllegalArgumentException exception) {
- log.warn("Kafka metrics reporting format " + reportingFormat +
- " not recognized. Will report in json format.", exception);
- formatEnum = KafkaReportingFormats.JSON;
- }
-
- if (metricsEnabled) {
- try {
- formatEnum.metricReporterBuilder(properties)
- .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
- } catch (IOException exception) {
- log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
- }
- }
-
- if (eventsEnabled) {
- try {
- KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
- properties);
- return builder.build(brokers, eventsTopic.or(defaultTopic).get());
- } catch (IOException exception) {
- log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
- }
- }
-
- log.info("Will start reporting metrics to Kafka");
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
deleted file mode 100644
index 066dce4..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-
-
-@Test(groups = {"gobblin.metrics"})
-public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
-
- @Override
- public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
- KafkaPusher pusher) {
- return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
- throws IOException {
- Assert.assertTrue(it.hasNext());
- return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
deleted file mode 100644
index bbf2646..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
-import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-
-
-/**
- * Test for KafkaAvroReporter
- * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer
- *
- * @author ibuenros
- */
-@Test(groups = {"gobblin.metrics"})
-public class KafkaAvroReporterTest extends KafkaReporterTest {
-
- public KafkaAvroReporterTest(String topic)
- throws IOException, InterruptedException {
- super();
- }
-
- public KafkaAvroReporterTest() throws IOException, InterruptedException {
- this("KafkaAvroReporterTest");
- }
-
- @Override
- public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) {
- return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
- }
-
- @Override
- public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) {
- return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected MetricReport nextReport(Iterator<byte[]> it)
- throws IOException {
- Assert.assertTrue(it.hasNext());
- return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
deleted file mode 100644
index 177f8d3..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.Maps;
-
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-
-
-@Test(groups = {"gobblin.metrics"})
-public class KafkaEventReporterTest {
-
- /**
- * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
- * @param context metricregistry
- * @return KafkaReporter builder
- */
- public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
- KafkaPusher pusher) {
- return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher);
- }
-
-
- @Test
- public void testKafkaEventReporter() throws IOException {
- MetricContext context = MetricContext.builder("context").build();
-
- MockKafkaPusher pusher = new MockKafkaPusher();
- KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
- String namespace = "gobblin.metrics.test";
- String eventName = "testEvent";
-
- GobblinTrackingEvent event = new GobblinTrackingEvent();
- event.setName(eventName);
- event.setNamespace(namespace);
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put("m1", "v1");
- metadata.put("m2", null);
- event.setMetadata(metadata);
- context.submitEvent(event);
-
- try {
- Thread.sleep(100);
- } catch(InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- kafkaReporter.report();
-
- try {
- Thread.sleep(100);
- } catch(InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
- Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
- Assert.assertEquals(retrievedEvent.getName(), eventName);
- Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
-
- }
-
- @Test
- public void testTagInjection() throws IOException {
-
- String tag1 = "tag1";
- String value1 = "value1";
- String metadataValue1 = "metadata1";
- String tag2 = "tag2";
- String value2 = "value2";
-
- MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)).
- addTag(new Tag<String>(tag2, value2)).build();
-
- MockKafkaPusher pusher = new MockKafkaPusher();
- KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
- String namespace = "gobblin.metrics.test";
- String eventName = "testEvent";
-
- GobblinTrackingEvent event = new GobblinTrackingEvent();
- event.setName(eventName);
- event.setNamespace(namespace);
- Map<String, String> metadata = Maps.newHashMap();
- metadata.put(tag1, metadataValue1);
- event.setMetadata(metadata);
- context.submitEvent(event);
-
- try {
- Thread.sleep(100);
- } catch(InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- kafkaReporter.report();
-
- try {
- Thread.sleep(100);
- } catch(InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
- Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
- Assert.assertEquals(retrievedEvent.getName(), eventName);
- Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
- Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1);
- Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
- }
-
- /**
- * Extract the next metric from the Kafka iterator
- * Assumes existence of the metric has already been checked.
- * @param it Kafka ConsumerIterator
- * @return next metric in the stream
- * @throws java.io.IOException
- */
- protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException {
- Assert.assertTrue(it.hasNext());
- return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
deleted file mode 100644
index c431cb0..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.reporter;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.metrics.Measurements;
-import org.apache.gobblin.metrics.Metric;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.MetricReport;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.kafka.KafkaPusher;
-import org.apache.gobblin.metrics.kafka.KafkaReporter;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
-
-
-@Test(groups = { "gobblin.metrics" })
-public class KafkaReporterTest {
-
- public KafkaReporterTest() throws IOException, InterruptedException {}
-
- /**
- * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
- * @return KafkaReporter builder
- */
- public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) {
- return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
- }
-
- public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) {
- return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
- }
-
- @Test
- public void testKafkaReporter() throws IOException {
- MetricContext metricContext =
- MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build();
- Counter counter = metricContext.counter("com.linkedin.example.counter");
- Meter meter = metricContext.meter("com.linkedin.example.meter");
- Histogram histogram = metricContext.histogram("com.linkedin.example.histogram");
-
- MockKafkaPusher pusher = new MockKafkaPusher();
- KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties());
-
- counter.inc();
- meter.mark(2);
- histogram.update(1);
- histogram.update(1);
- histogram.update(2);
-
- kafkaReporter.report(metricContext);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- Map<String, Double> expected = new HashMap<>();
- expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0);
- expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0);
- expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0);
-
- MetricReport nextReport = nextReport(pusher.messageIterator());
-
- expectMetricsWithValues(nextReport, expected);
-
- kafkaReporter.report(metricContext);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- Set<String> expectedSet = new HashSet<>();
- expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT);
- expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT);
- expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
- expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
- expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
- expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
- expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
-
- nextReport = nextReport(pusher.messageIterator());
- expectMetrics(nextReport, expectedSet, true);
-
- kafkaReporter.close();
-
- }
-
- @Test
- public void kafkaReporterTagsTest() throws IOException {
- MetricContext metricContext =
- MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
- Counter counter = metricContext.counter("com.linkedin.example.counter");
-
- Tag<?> tag1 = new Tag<>("tag1", "value1");
- Tag<?> tag2 = new Tag<>("tag2", 2);
-
- MockKafkaPusher pusher = new MockKafkaPusher();
- KafkaReporter kafkaReporter =
- getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties());
-
- counter.inc();
-
- kafkaReporter.report(metricContext);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- MetricReport metricReport = nextReport(pusher.messageIterator());
-
- Assert.assertEquals(4, metricReport.getTags().size());
- Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
- Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
- Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey()));
- Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
- }
-
- @Test
- public void kafkaReporterContextTest() throws IOException {
- Tag<?> tag1 = new Tag<>("tag1", "value1");
- MetricContext context = MetricContext.builder("context").addTag(tag1).build();
- Counter counter = context.counter("com.linkedin.example.counter");
-
- MockKafkaPusher pusher = new MockKafkaPusher();
- KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties());
-
- counter.inc();
-
- kafkaReporter.report(context);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
-
- MetricReport metricReport = nextReport(pusher.messageIterator());
-
- Assert.assertEquals(3, metricReport.getTags().size());
- Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
- Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
-
- }
-
- /**
- * Expect a list of metrics with specific values.
- * Fail if not all metrics are received, or some metric has the wrong value.
- * @param report MetricReport.
- * @param expected map of expected metric names and their values
- * @throws IOException
- */
- private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException {
- List<Metric> metricIterator = report.getMetrics();
-
- for (Metric metric : metricIterator) {
- if (expected.containsKey(metric.getName())) {
- Assert.assertEquals(expected.get(metric.getName()), metric.getValue());
- expected.remove(metric.getName());
- }
- }
-
- Assert.assertTrue(expected.isEmpty());
-
- }
-
- /**
- * Expect a set of metric names. Will fail if not all of these metrics are received.
- * @param report MetricReport
- * @param expected set of expected metric names
- * @param strict if set to true, will fail if receiving any metric that is not expected
- * @throws IOException
- */
- private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException {
- List<Metric> metricIterator = report.getMetrics();
- for (Metric metric : metricIterator) {
- //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name));
- if (expected.contains(metric.getName())) {
- expected.remove(metric.getName());
- } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) {
- Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
- }
- }
- Assert.assertTrue(expected.isEmpty());
- }
-
- /**
- * Extract the next metric from the Kafka iterator
- * Assumes existence of the metric has already been checked.
- * @param it Kafka ConsumerIterator
- * @return next metric in the stream
- * @throws IOException
- */
- protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
- Assert.assertTrue(it.hasNext());
- return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..3d2de9b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public class KafkaProducerPusher implements Pusher {
+
+ private final String topic;
+ private final KafkaProducer<String, byte[]> producer;
+ private final Closer closer;
+
+ public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+ this.closer = Closer.create();
+
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+ // add the kafka scoped config. if any of the above are specified then they are overridden
+ if (kafkaConfig.isPresent()) {
+ props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ }
+
+ this.producer = createProducer(props);
+ }
+
+ public KafkaProducerPusher(String brokers, String topic) {
+ this(brokers, topic, Optional.absent());
+ }
+
+ /**
+ * Push all byte array messages to the Kafka topic.
+ * @param messages List of byte array messages to push to Kakfa.
+ */
+ public void pushMessages(List<byte[]> messages) {
+ for (byte[] message: messages) {
+ this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ this.closer.close();
+ }
+
+ /**
+ * Create the Kafka producer.
+ */
+ protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+ return this.closer.register(new KafkaProducer<String, byte[]>(props));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..723f8b7
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+import kafka.consumer.ConsumerIterator;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+ public static final String TOPIC = KafkaProducerPusherTest.class.getSimpleName();
+
+ private KafkaTestBase kafkaTestHelper;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
+
+ kafkaTestHelper.provisionTopic(TOPIC);
+ }
+
+ @Test
+ public void test() throws IOException {
+ // Test that the scoped config overrides the generic config
+ Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+
+ pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ assert(iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg1);
+ assert(iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+ pusher.close();
+ }
+
+ @AfterClass
+ public void after() {
+ try {
+ this.kafkaTestHelper.close();
+ } catch(Exception e) {
+ System.err.println("Failed to close Kafka server.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
new file mode 100644
index 0000000..a0af52a
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+
+
+/**
+ * Kafka reporting formats enumeration.
+ */
+public enum KafkaReportingFormats {
+
+ AVRO,
+ JSON;
+
+ /**
+ * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format.
+ *
+ * @param properties {@link Properties} containing information to build reporters.
+ * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
+ */
+ public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) {
+ switch (this) {
+ case AVRO:
+ KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
+ if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+ builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ }
+ return builder;
+ case JSON:
+ return KafkaReporter.BuilderFactory.newBuilder();
+ default:
+ // This should never happen.
+ throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+ }
+ }
+
+ /**
+ * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format.
+ * @param context {@link MetricContext} that should be reported.
+ * @param properties {@link Properties} containing information to build reporters.
+ * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
+ */
+ public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) {
+ switch (this) {
+ case AVRO:
+ KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context);
+ if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+ builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ }
+ return builder;
+ case JSON:
+ return KafkaEventReporter.Factory.forContext(context);
+ default:
+ // This should never happen.
+ throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
new file mode 100644
index 0000000..ecb5d7d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+
+
+/**
+ * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records.
+ */
+public class KafkaAvroEventReporter extends KafkaEventReporter {
+
+ protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
+ super(builder);
+ if(builder.registry.isPresent()) {
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+ this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
+ Optional.of(schema)));
+ }
+ }
+
+ @Override
+ protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter)
+ throws IOException {
+ return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
+ }
+
+ /**
+ * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}.
+ *
+ * @param context the {@link MetricContext} to report
+ * @return KafkaAvroReporter builder
+ * @deprecated this method is bugged. Use {@link Factory#forContext} instead.
+ */
+ @Deprecated
+ public static Builder<? extends Builder<?>> forContext(MetricContext context) {
+ return new BuilderImpl(context);
+ }
+
+ private static class BuilderImpl extends Builder<BuilderImpl> {
+ private BuilderImpl(MetricContext context) {
+ super(context);
+ }
+
+ @Override
+ protected BuilderImpl self() {
+ return this;
+ }
+ }
+
+ public static abstract class Factory {
+ /**
+ * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}.
+ *
+ * @param context the {@link MetricContext} to report
+ * @return KafkaAvroReporter builder
+ */
+ public static BuilderImpl forContext(MetricContext context) {
+ return new BuilderImpl(context);
+ }
+ }
+
+ /**
+ * Builder for {@link KafkaAvroEventReporter}.
+ * Defaults to no filter, reporting rates in seconds and times in milliseconds.
+ */
+ public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
+
+ private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+
+ private Builder(MetricContext context) {
+ super(context);
+ }
+
+ public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
+ this.registry = Optional.of(registry);
+ return self();
+ }
+
+ /**
+ * Builds and returns {@link KafkaAvroEventReporter}.
+ *
+ * @param brokers string of Kafka brokers
+ * @param topic topic to send metrics to
+ * @return KafkaAvroReporter
+ */
+ public KafkaAvroEventReporter build(String brokers, String topic) throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KafkaAvroEventReporter(this);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
new file mode 100644
index 0000000..4b6399b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Kafka reporter for codahale metrics writing metrics in Avro format.
+ *
+ * @author ibuenros
+ */
+public class KafkaAvroReporter extends KafkaReporter {
+
+ protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
+ super(builder, config);
+ if (builder.registry.isPresent()) {
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
+ this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
+ Optional.of(schema)));
+ }
+ }
+
+ @Override
+ protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter)
+ throws IOException {
+ return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
+ }
+
+ /**
+ * A static factory class for obtaining new {@link Builder}s
+ *
+ * @see Builder
+ */
+ public static class BuilderFactory {
+
+ public static BuilderImpl newBuilder() {
+ return new BuilderImpl();
+ }
+ }
+
+ public static class BuilderImpl extends Builder<BuilderImpl> {
+
+ @Override
+ protected BuilderImpl self() {
+ return this;
+ }
+ }
+
+ /**
+ * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
+ */
+ public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
+
+ private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+
+ public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
+ this.registry = Optional.of(registry);
+ return self();
+ }
+
+ /**
+ * Builds and returns {@link KafkaAvroReporter}.
+ *
+ * @param brokers string of Kafka brokers
+ * @param topic topic to send metrics to
+ * @return KafkaAvroReporter
+ */
+ public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 4c155fb..6162636 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -31,7 +31,6 @@ import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +39,7 @@ import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
import org.apache.gobblin.util.AvroUtils;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
new file mode 100644
index 0000000..b15e96e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+
+
+/**
+ * Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
+ */
+public class KafkaEventReporter extends EventReporter {
+
+ protected final AvroSerializer<GobblinTrackingEvent> serializer;
+ private final Pusher kafkaPusher;
+
+ public KafkaEventReporter(Builder<?> builder) throws IOException {
+ super(builder);
+
+ this.serializer = this.closer.register(
+ createSerializer(new FixedSchemaVersionWriter()));
+
+ if(builder.kafkaPusher.isPresent()) {
+ this.kafkaPusher = builder.kafkaPusher.get();
+ } else {
+ String pusherClassName = builder.pusherClassName.or(PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+ this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, builder.config);
+ }
+ }
+
+ @Override
+ public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+ GobblinTrackingEvent nextEvent;
+ List<byte[]> events = Lists.newArrayList();
+
+ while(null != (nextEvent = queue.poll())) {
+ events.add(this.serializer.serializeRecord(nextEvent));
+ }
+
+ if (!events.isEmpty()) {
+ this.kafkaPusher.pushMessages(events);
+ }
+
+ }
+
+ protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException {
+ return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter);
+ }
+
+ /**
+ * Returns a new {@link Builder} for {@link KafkaEventReporter}.
+ * Will automatically add all Context tags to the reporter.
+ *
+ * @param context the {@link MetricContext} to report
+ * @return KafkaReporter builder
+ * @deprecated this method is bugged. Use {@link Factory#forContext} instead.
+ */
+ @Deprecated
+ public static Builder<? extends Builder> forContext(MetricContext context) {
+ return new BuilderImpl(context);
+ }
+
+ public static class BuilderImpl extends Builder<BuilderImpl> {
+ private BuilderImpl(MetricContext context) {
+ super(context);
+ }
+
+ @Override
+ protected BuilderImpl self() {
+ return this;
+ }
+ }
+
+ public static class Factory {
+ /**
+ * Returns a new {@link Builder} for {@link KafkaEventReporter}.
+ * Will automatically add all Context tags to the reporter.
+ *
+ * @param context the {@link MetricContext} to report
+ * @return KafkaReporter builder
+ */
+ public static BuilderImpl forContext(MetricContext context) {
+ return new BuilderImpl(context);
+ }
+ }
+
+ /**
+ * Builder for {@link KafkaEventReporter}.
+ * Defaults to no filter, reporting rates in seconds and times in milliseconds.
+ */
+ public static abstract class Builder<T extends EventReporter.Builder<T>>
+ extends EventReporter.Builder<T> {
+ protected String brokers;
+ protected String topic;
+ protected Optional<Pusher> kafkaPusher;
+ protected Optional<Config> config = Optional.absent();
+ protected Optional<String> pusherClassName = Optional.absent();
+
+ protected Builder(MetricContext context) {
+ super(context);
+ this.kafkaPusher = Optional.absent();
+ }
+
+ /**
+ * Set {@link Pusher} to use.
+ */
+ public T withKafkaPusher(Pusher pusher) {
+ this.kafkaPusher = Optional.of(pusher);
+ return self();
+ }
+
+ /**
+ * Set additional configuration.
+ */
+ public T withConfig(Config config) {
+ this.config = Optional.of(config);
+ return self();
+ }
+
+ /**
+ * Set a {@link Pusher} class name
+ */
+ public T withPusherClassName(String pusherClassName) {
+ this.pusherClassName = Optional.of(pusherClassName);
+ return self();
+ }
+
+ /**
+ * Builds and returns {@link KafkaEventReporter}.
+ *
+ * @param brokers string of Kafka brokers
+ * @param topic topic to send metrics to
+ * @return KafkaReporter
+ */
+ public KafkaEventReporter build(String brokers, String topic) throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KafkaEventReporter(this);
+ }
+
+ }
+}