You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/17 21:36:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1116] Avoid registering schema with schema registry during Me…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4bedefc [GOBBLIN-1116] Avoid registering schema with schema registry during Me…
4bedefc is described below
commit 4bedefc13b6ba36a4f0169bbae3aae18b1ceaa39
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 17 14:36:36 2020 -0700
[GOBBLIN-1116] Avoid registering schema with schema registry during Me…
Closes #2956 from sv2000/metricsReporterFailure
---
.../gobblin/configuration/ConfigurationKeys.java | 7 +-
.../gobblin/metrics/reporter/util/EventUtils.java | 34 ++++---
.../metrics/reporter/util/MetricReportUtils.java | 59 ++++++++++--
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 52 ++--------
.../azkaban/AzkabanGobblinYarnAppLauncherTest.java | 61 ------------
...fkaAvroEventReporterWithSchemaRegistryTest.java | 78 +++++++++------
.../gobblin/converter/EnvelopeSchemaConverter.java | 27 +++---
.../kafka/schemareg/LiKafkaSchemaRegistry.java | 6 +-
.../gobblin/metrics/KafkaReportingFormats.java | 19 +++-
.../kafka/KafkaAvroEventKeyValueReporter.java | 32 ++++---
.../metrics/kafka/KafkaAvroEventReporter.java | 17 +++-
.../gobblin/metrics/kafka/KafkaAvroReporter.java | 17 +++-
.../metrics/kafka/KafkaAvroSchemaRegistry.java | 4 +-
.../metrics/kafka/KafkaReporterFactory.java | 29 +++---
.../metrics/kafka/SchemaRegistryException.java | 5 +-
...roReporterUtil.java => KafkaReporterUtils.java} | 55 ++++++++++-
.../reporter/util/SchemaRegistryVersionWriter.java | 55 ++++++-----
.../KafkaAvroEventKeyValueReporterTest.java | 103 +++++++++++++-------
.../reporter/KafkaAvroEventReporterTest.java | 2 +-
.../KafkaAvroReporterWithSchemaRegistryTest.java | 105 +++++++++++++++++++++
.../reporter/KeyValueEventObjectReporterTest.java | 4 +-
.../reporter/KeyValueMetricObjectReporterTest.java | 4 +-
.../monitoring/KafkaAvroJobStatusMonitor.java | 3 +-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 90 +++++++++++++++++-
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 47 +++++++++
.../org/apache/gobblin/yarn/YarnServiceTest.java | 26 ++++-
26 files changed, 651 insertions(+), 290 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 53bbe3c..5cb2613 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -716,11 +716,16 @@ public class ConfigurationKeys {
public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.avro.use.schema.registry";
+ public static final String METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.avro.schemaId";
+ public static final String METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.metrics.kafka.avro.schemaId";
+
public static final String DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY = Boolean.toString(false);
public static final String METRICS_KAFKA_BROKERS = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.brokers";
// Topic used for both event and metric reporting.
// Can be overriden by METRICS_KAFKA_TOPIC_METRICS and METRICS_KAFKA_TOPIC_EVENTS.
- public static final String METRICS_KAFKA_TOPIC = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic";
+ public static final String METRICS_KAFKA_TOPIC = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.common";
// Topic used only for metric reporting.
public static final String METRICS_KAFKA_TOPIC_METRICS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.metrics";
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
index af2869b..4bb48cb 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/EventUtils.java
@@ -28,6 +28,8 @@ import org.apache.avro.specific.SpecificDatumReader;
import com.google.common.base.Optional;
import com.google.common.io.Closer;
+import javax.annotation.Nullable;
+
import org.apache.gobblin.metrics.GobblinTrackingEvent;
@@ -75,13 +77,26 @@ public class EventUtils {
}
/**
- * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array Avro serialization.
- * @param reuse MetricReport to reuse.
+ * Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
+ * @param reuse GobblinTrackingEvent to reuse.
* @param bytes Input bytes.
- * @return MetricReport.
+ * @return GobblinTrackingEvent.
* @throws java.io.IOException
*/
- public synchronized static GobblinTrackingEvent deserializeReportFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes)
+ public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes)
+ throws IOException {
+ return deserializeEventFromAvroSerialization(reuse, bytes, null);
+ }
+
+ /**
+ * Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
+ * @param reuse GobblinTrackingEvent to reuse.
+ * @param bytes Input bytes.
+ * @param schemaId Expected schemaId.
+ * @return GobblinTrackingEvent.
+ * @throws java.io.IOException
+ */
+ public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes, @Nullable String schemaId)
throws IOException {
if (!reader.isPresent()) {
reader = Optional.of(new SpecificDatumReader<>(GobblinTrackingEvent.class));
@@ -92,12 +107,10 @@ public class EventUtils {
try {
DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
- // Check version byte
- int versionNumber = inputStream.readInt();
- if (versionNumber != SCHEMA_VERSION) {
- throw new IOException(String
- .format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
- SCHEMA_VERSION));
+ if (schemaId != null) {
+ MetricReportUtils.readAndVerifySchemaId(inputStream, schemaId);
+ } else {
+ MetricReportUtils.readAndVerifySchemaVersion(inputStream);
}
// Decode the rest
@@ -110,4 +123,3 @@ public class EventUtils {
}
}
}
-
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 2d49606..b51dcc6 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -24,10 +24,13 @@ import java.io.IOException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.codec.binary.Hex;
import com.google.common.base.Optional;
import com.google.common.io.Closer;
+import javax.annotation.Nullable;
+
import org.apache.gobblin.metrics.MetricReport;
@@ -64,8 +67,8 @@ public class MetricReportUtils {
// Check version byte
int versionNumber = inputStream.readInt();
if (versionNumber != SCHEMA_VERSION) {
- throw new IOException(
- String.format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
+ throw new IOException(String
+ .format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
SCHEMA_VERSION));
}
@@ -88,6 +91,20 @@ public class MetricReportUtils {
*/
public synchronized static MetricReport deserializeReportFromAvroSerialization(MetricReport reuse, byte[] bytes)
throws IOException {
+ return deserializeReportFromAvroSerialization(reuse, bytes, null);
+ }
+
+ /**
+ * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array Avro serialization.
+ * @param reuse MetricReport to reuse.
+ * @param bytes Input bytes.
+ * @param schemaId Expected schemaId.
+ * @return MetricReport.
+ * @throws java.io.IOException
+ */
+ public synchronized static MetricReport deserializeReportFromAvroSerialization(MetricReport reuse, byte[] bytes,
+ @Nullable String schemaId)
+ throws IOException {
if (!READER.isPresent()) {
READER = Optional.of(new SpecificDatumReader<>(MetricReport.class));
}
@@ -96,15 +113,11 @@ public class MetricReportUtils {
try {
DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
-
- // Check version byte
- int versionNumber = inputStream.readInt();
- if (versionNumber != SCHEMA_VERSION) {
- throw new IOException(
- String.format("MetricReport schema version not recognized. Found version %d, expected %d.", versionNumber,
- SCHEMA_VERSION));
+ if (schemaId != null) {
+ readAndVerifySchemaId(inputStream, schemaId);
+ } else {
+ readAndVerifySchemaVersion(inputStream);
}
-
// Decode the rest
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return READER.get().read(reuse, decoder);
@@ -114,4 +127,30 @@ public class MetricReportUtils {
closer.close();
}
}
+
+ public static void readAndVerifySchemaId(DataInputStream inputStream, String schemaId)
+ throws IOException {
+ //Read the magic byte
+ inputStream.readByte();
+ int schemaIdLengthBytes = schemaId.length() / 2;
+ byte[] readId = new byte[schemaIdLengthBytes];
+ int numBytesRead = inputStream.read(readId, 0, schemaIdLengthBytes);
+ String readSchemaId = Hex.encodeHexString(readId);
+ if (numBytesRead != schemaIdLengthBytes || !schemaId.equals(readSchemaId)) {
+ throw new IOException(String
+ .format("Schema version not recognized. Found version %s, expected %s.", readSchemaId,
+ schemaId));
+ }
+ }
+
+ public static void readAndVerifySchemaVersion(DataInputStream inputStream)
+ throws IOException {
+ // Check version byte
+ int versionNumber = inputStream.readInt();
+ if (versionNumber != SCHEMA_VERSION) {
+ throw new IOException(String
+ .format("Schema version not recognized. Found version %d, expected %d.", versionNumber,
+ SCHEMA_VERSION));
+ }
+ }
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 4747e89..97aa15a 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -17,26 +17,22 @@
package org.apache.gobblin.azkaban;
-import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Logger;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
import azkaban.jobExecutor.AbstractJob;
import lombok.Getter;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
+
/**
* A utility class for launching a Gobblin application on Yarn through Azkaban.
@@ -54,9 +50,6 @@ import lombok.Getter;
* @author Yinan Li
*/
public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
- // if this is set then the Azkaban config will be written to the specified file path
- public static final String AZKABAN_CONFIG_OUTPUT_PATH = "gobblin.yarn.akabanConfigOutputPath";
-
private static final Logger LOGGER = Logger.getLogger(AzkabanJobLauncher.class);
private final GobblinYarnAppLauncher gobblinYarnAppLauncher;
@@ -64,12 +57,11 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
@Getter
private final YarnConfiguration yarnConfiguration;
- public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) throws IOException {
+ public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
+ throws IOException, SchemaRegistryException {
super(jobId, LOGGER);
Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
- outputConfigToFile(gobblinConfig);
-
yarnConfiguration = initYarnConf(gobblinProps);
this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
@@ -110,36 +102,4 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
super.cancel();
}
}
-
- /**
- * Write the config to the file specified with the config key {@value AZKABAN_CONFIG_OUTPUT_PATH} if it
- * is configured.
- * @param config the config to output
- * @throws IOException
- */
- @VisibleForTesting
- static void outputConfigToFile(Config config) throws IOException {
- // If a file path is specified then write the Azkaban config to that path in HOCON format.
- // This can be used to generate an application.conf file to pass to the yarn app master and containers.
- if (config.hasPath(AZKABAN_CONFIG_OUTPUT_PATH)) {
- File configFile = new File(config.getString(AZKABAN_CONFIG_OUTPUT_PATH));
- File parentDir = configFile.getParentFile();
-
- if (parentDir != null && !parentDir.exists()) {
- if (!parentDir.mkdirs()) {
- throw new IOException("Error creating directories for " + parentDir);
- }
- }
-
- ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
- configRenderOptions = configRenderOptions.setComments(false);
- configRenderOptions = configRenderOptions.setOriginComments(false);
- configRenderOptions = configRenderOptions.setFormatted(true);
- configRenderOptions = configRenderOptions.setJson(false);
-
- String renderedConfig = config.root().render(configRenderOptions);
-
- FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
- }
- }
}
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java
deleted file mode 100644
index ccb49c0..0000000
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncherTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.azkaban;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-
-@Slf4j
-public class AzkabanGobblinYarnAppLauncherTest {
-
- @Test
- public void testOutputConfig() throws IOException {
- File tmpTestDir = Files.createTempDir();
-
- try {
- Path outputPath = Paths.get(tmpTestDir.toString(), "application.conf");
- Config config = ConfigFactory.empty()
- .withValue(ConfigurationKeys.FS_URI_KEY, ConfigValueFactory.fromAnyRef("file:///"))
- .withValue(AzkabanGobblinYarnAppLauncher.AZKABAN_CONFIG_OUTPUT_PATH,
- ConfigValueFactory.fromAnyRef(outputPath.toString()));
-
- AzkabanGobblinYarnAppLauncher.outputConfigToFile(config);
-
- String configString = Files.toString(outputPath.toFile(), Charsets.UTF_8);
- Assert.assertTrue(configString.contains("fs"));
- } finally {
- FileUtils.deleteDirectory(tmpTestDir);
- }
- }
-}
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
index 9d418da..e65e4bf 100644
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterWithSchemaRegistryTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.metrics.reporter;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.io.IOException;
import java.util.Map;
import org.apache.avro.Schema;
@@ -37,7 +38,6 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
public class KafkaAvroEventReporterWithSchemaRegistryTest {
@@ -46,28 +46,51 @@ public class KafkaAvroEventReporterWithSchemaRegistryTest {
@Test
public void test() throws Exception {
+ testHelper(false);
+ }
+
+ @Test
+ public void testWithSchemaId() throws IOException {
+ testHelper(true);
+ }
+
+ private String register(Schema schema) {
+ String id = DigestUtils.sha1Hex(schema.toString().getBytes());
+ this.schemas.put(id, schema);
+ return id;
+ }
+ private void testHelper(boolean isSchemaIdEnabled) throws IOException {
MetricContext context = MetricContext.builder("context").build();
MockKafkaPusher pusher = new MockKafkaPusher();
KafkaAvroSchemaRegistry registry = Mockito.mock(KafkaAvroSchemaRegistry.class);
- Mockito.when(registry.register(Mockito.any(Schema.class))).thenAnswer(new Answer<String>() {
- @Override
- public String answer(InvocationOnMock invocation)
- throws Throwable {
- return register((Schema) invocation.getArguments()[0]);
- }
- });
- Mockito.when(registry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
- @Override
- public String answer(InvocationOnMock invocation)
- throws Throwable {
- return register((Schema) invocation.getArguments()[0]);
- }
- });
- KafkaEventReporter kafkaReporter =
- KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher)
- .withSchemaRegistry(registry).build("localhost:0000", "topic");
+ KafkaAvroEventReporter.Builder builder = KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher).withSchemaRegistry(registry);
+
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+ String schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+
+ if (!isSchemaIdEnabled) {
+ Mockito.when(registry.register(Mockito.any(Schema.class))).thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation)
+ throws Throwable {
+ return register((Schema) invocation.getArguments()[0]);
+ }
+ });
+ Mockito.when(registry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation)
+ throws Throwable {
+ return register((Schema) invocation.getArguments()[0]);
+ }
+ });
+ } else {
+ builder.withSchemaId(schemaId);
+ }
+
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic");
GobblinTrackingEvent event = new GobblinTrackingEvent(0l, "namespace", "name", Maps.<String, String>newHashMap());
@@ -93,19 +116,12 @@ public class KafkaAvroEventReporterWithSchemaRegistryTest {
byte[] readId = new byte[20];
Assert.assertEquals(is.read(readId), 20);
String readStringId = Hex.encodeHexString(readId);
- Assert.assertTrue(this.schemas.containsKey(readStringId));
-
- Schema schema = this.schemas.get(readStringId);
- Assert.assertFalse(schema.toString().contains("avro.java.string"));
-
+ if (!isSchemaIdEnabled) {
+ Assert.assertTrue(this.schemas.containsKey(readStringId));
+ Schema readSchema = this.schemas.get(readStringId);
+ Assert.assertFalse(readSchema.toString().contains("avro.java.string"));
+ }
+ Assert.assertEquals(readStringId, schemaId);
is.close();
}
-
- private String register(Schema schema)
- throws SchemaRegistryException {
- String id = DigestUtils.sha1Hex(schema.toString().getBytes());
- this.schemas.put(id, schema);
- return id;
- }
-
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
index 124fe4c..2caceb6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
@@ -17,28 +17,31 @@
package org.apache.gobblin.converter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+
+import javax.xml.bind.DatatypeConverter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.filter.AvroProjectionConverter;
import org.apache.gobblin.converter.filter.AvroSchemaFieldRemover;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistryFactory;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.util.AvroUtils;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutionException;
-import javax.xml.bind.DatatypeConverter;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
/**
* A converter for extracting schema/records from an envelope schema.
@@ -123,7 +126,7 @@ public class EnvelopeSchemaConverter extends Converter<Schema, String, GenericRe
payloadSchema = this.fieldRemover.get().removeFields(payloadSchema);
}
return new SingleRecordIterable<>(AvroUtils.convertRecordSchema(outputRecord, payloadSchema));
- } catch (IOException | SchemaRegistryException | ExecutionException e) {
+ } catch (IOException | ExecutionException e) {
throw new DataConversionException(e);
}
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
index d046747..0e00f04 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
@@ -30,8 +30,6 @@ import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
-import org.apache.gobblin.util.PropertiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +38,9 @@ import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.serialize.MD5Digest;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -70,7 +70,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch
String.format("Property %s not provided.", KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL));
this.url = props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
- this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
+ this.namespaceOverride = KafkaReporterUtils.extractOverrideNamespace(props);
this.switchTopicNames = PropertiesUtils.getPropAsBoolean(props, KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME,
KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index eb409ff..094bb83 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import com.codahale.metrics.ScheduledReporter;
import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -35,7 +36,7 @@ import org.apache.gobblin.metrics.reporter.KeyValueEventObjectReporter;
import org.apache.gobblin.metrics.reporter.KeyValueMetricObjectReporter;
import org.apache.gobblin.metrics.kafka.KafkaReporter;
import org.apache.gobblin.metrics.kafka.PusherUtils;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -53,6 +54,10 @@ public enum KafkaReportingFormats {
if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID);
+ if (!Strings.isNullOrEmpty(schemaId)) {
+ builder.withSchemaId(schemaId);
+ }
}
builder.build(brokers, topic, properties);
}
@@ -66,6 +71,10 @@ public enum KafkaReportingFormats {
if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID);
+ if (!Strings.isNullOrEmpty(schemaId)) {
+ builder.withSchemaId(schemaId);
+ }
}
String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? properties
.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : properties
@@ -104,6 +113,10 @@ public enum KafkaReportingFormats {
if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ String schemaId = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID);
+ if (!Strings.isNullOrEmpty(schemaId)) {
+ builder.withSchemaId(schemaId);
+ }
}
String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? properties
.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : properties
@@ -155,7 +168,7 @@ public enum KafkaReportingFormats {
throws IOException {
KeyValueMetricObjectReporter.Builder builder = new KeyValueMetricObjectReporter.Builder();
- builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+ builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(properties));
Config allConfig = ConfigUtils.propertiesToConfig(properties);
Config config = ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX)
.withFallback(allConfig);
@@ -173,7 +186,7 @@ public enum KafkaReportingFormats {
ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX)
.withFallback(allConfig);
builder.withConfig(config);
- builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+ builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(properties));
return builder.build(brokers, topic);
}
};
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
index bf6a07e..2d447c2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
@@ -17,23 +17,22 @@
package org.apache.gobblin.metrics.kafka;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
/**
* Implement of {@link KafkaEventKeyValueReporter} for avro records.
@@ -44,10 +43,11 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
protected KafkaAvroEventKeyValueReporter(Builder<?> builder) throws IOException {
super(builder);
if(builder.registry.isPresent()) {
- Schema schema =
- new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
- this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
- Optional.of(schema)));
+ Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+ SchemaRegistryVersionWriter schemaVersionWriter =
+ builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+ builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+ this.serializer.setSchemaVersionWriter(schemaVersionWriter);
}
}
@@ -86,6 +86,7 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
*/
public static abstract class Builder<T extends Builder<T>> extends KafkaEventKeyValueReporter.Builder<T> {
private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+ private Optional<String> schemaId = Optional.absent();
private Builder(MetricContext context) {
super(context);
@@ -96,6 +97,11 @@ public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter {
return self();
}
+ public T withSchemaId(String schemaId) {
+ this.schemaId = Optional.of(schemaId);
+ return self();
+ }
+
/**
* Builds and returns {@link KafkaAvroEventReporter}.
*
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
index ecb5d7d..dd1297b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
@@ -39,10 +40,11 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
protected KafkaAvroEventReporter(Builder<?> builder) throws IOException {
super(builder);
if(builder.registry.isPresent()) {
- Schema schema =
- new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
- this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
- Optional.of(schema)));
+ Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+ SchemaRegistryVersionWriter schemaVersionWriter =
+ builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+ builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+ this.serializer.setSchemaVersionWriter(schemaVersionWriter);
}
}
@@ -92,8 +94,8 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
* Defaults to no filter, reporting rates in seconds and times in milliseconds.
*/
public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> {
-
private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+ private Optional<String> schemaId = Optional.absent();
private Builder(MetricContext context) {
super(context);
@@ -104,6 +106,11 @@ public class KafkaAvroEventReporter extends KafkaEventReporter {
return self();
}
+ public T withSchemaId(String schemaId) {
+ this.schemaId = Optional.of(schemaId);
+ return self();
+ }
+
/**
* Builds and returns {@link KafkaAvroEventReporter}.
*
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
index 5cb3ce8..6a4396e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -28,6 +28,7 @@ import com.typesafe.config.Config;
import org.apache.gobblin.metrics.MetricReport;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
@@ -42,10 +43,11 @@ public class KafkaAvroReporter extends KafkaReporter {
protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException {
super(builder, config);
if (builder.registry.isPresent()) {
- Schema schema =
- new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
- this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
- Optional.of(schema)));
+ Schema schema = KafkaReporterUtils.getMetricReportSchema();
+ SchemaRegistryVersionWriter schemaVersionWriter =
+ builder.schemaId.isPresent() ? new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema,
+ builder.schemaId.get()) : new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, schema);
+ this.serializer.setSchemaVersionWriter(schemaVersionWriter);
}
}
@@ -79,14 +81,19 @@ public class KafkaAvroReporter extends KafkaReporter {
* Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds.
*/
public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> {
-
private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+ private Optional<String> schemaId = Optional.absent();
public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
this.registry = Optional.of(registry);
return self();
}
+ public T withSchemaId(String schemaId) {
+ this.schemaId = Optional.of(schemaId);
+ return self();
+ }
+
/**
* Builds and returns {@link KafkaAvroReporter}.
*
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 556c1d1..0bd3683 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -41,7 +41,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.HttpClientFactory;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.util.AvroUtils;
@@ -78,7 +78,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
String.format("Property %s not provided.", KAFKA_SCHEMA_REGISTRY_URL));
this.url = props.getProperty(KAFKA_SCHEMA_REGISTRY_URL);
- this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
+ this.namespaceOverride = KafkaReporterUtils.extractOverrideNamespace(props);
int objPoolSize =
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 9f38379..5c12360 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -25,45 +25,43 @@ import com.codahale.metrics.ScheduledReporter;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
import org.apache.gobblin.metrics.KafkaReportingFormats;
import org.apache.gobblin.metrics.RootMetricContext;
-
-import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@Slf4j
public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
@Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
- throws IOException {
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
}
log.info("Reporting metrics & events to Kafka");
- Optional<String> defaultTopic =
- Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
- Optional<String> metricsTopic =
- Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
- Optional<String> eventsTopic =
- Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+ boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
- boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
- if (metricsEnabled) {
+ if (KafkaReporterUtils.isMetricsEnabled(properties)) {
log.info("Metrics enabled --- Reporting metrics to Kafka");
}
- boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
- if (eventsEnabled) {
+
+ boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
+ if (KafkaReporterUtils.isEventsEnabled(properties)) {
log.info("Events enabled --- Reporting events to Kafka");
}
+ Optional<String> eventsTopic = KafkaReporterUtils.getEventsTopic(properties);
+ Optional<String> defaultTopic = KafkaReporterUtils.getDefaultTopic(properties);
+
try {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
"Kafka metrics brokers missing.");
- Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
+ Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
} catch (IllegalArgumentException exception) {
log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
return null;
@@ -84,6 +82,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
formatEnum = KafkaReportingFormats.JSON;
}
+ Optional<String> metricsTopic = KafkaReporterUtils.getMetricsTopic(properties);
if (metricsEnabled) {
try {
formatEnum.buildMetricsReporter(brokers, metricsTopic.or(defaultTopic).get(), properties);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
index 6cd5346..d06076a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/SchemaRegistryException.java
@@ -17,7 +17,10 @@
package org.apache.gobblin.metrics.kafka;
-public class SchemaRegistryException extends Exception {
+import java.io.IOException;
+
+
+public class SchemaRegistryException extends IOException {
private static final long serialVersionUID = 1L;
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
similarity index 55%
rename from gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
rename to gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
index 1d82921..5d44adb 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaReporterUtils.java
@@ -16,19 +16,25 @@
*/
package org.apache.gobblin.metrics.reporter.util;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.avro.Schema;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+
-public class KafkaAvroReporterUtil {
+public class KafkaReporterUtils {
+ public static final String METRIC_REPORT_AVRO_SCHEMA_FILE = "MetricReport.avsc";
+ public static final String GOBBLIN_TRACKING_EVENT_AVRO_SCHEMA_FILE = "GobblinTrackingEvent.avsc";
private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
@@ -72,4 +78,49 @@ public class KafkaAvroReporterUtil {
return Optional.<Map<String, String>>absent();
}
+
+ public static boolean isMetricsEnabled(Properties properties) {
+ Optional<String> defaultTopic = getDefaultTopic(properties);
+ Optional<String> metricsTopic = getMetricsTopic(properties);
+
+ return metricsTopic.or(defaultTopic).isPresent();
+ }
+
+ public static boolean isEventsEnabled(Properties properties) {
+ Optional<String> defaultTopic = getDefaultTopic(properties);
+ Optional<String> eventsTopic = getEventsTopic(properties);
+
+ return eventsTopic.or(defaultTopic).isPresent();
+ }
+
+ public static Optional<String> getDefaultTopic(Properties properties) {
+ return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+ }
+
+ public static Optional<String> getMetricsTopic(Properties properties) {
+ return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+ }
+
+ public static Optional<String> getEventsTopic(Properties properties) {
+ return Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+ }
+
+ public static boolean isKafkaReportingEnabled(Properties properties) {
+ return Boolean.parseBoolean(
+ properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED));
+ }
+
+ public static boolean isKafkaAvroSchemaRegistryEnabled(Properties properties) {
+ return Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY));
+ }
+
+ public static Schema getMetricReportSchema() throws IOException {
+ return new Schema.Parser()
+ .parse(KafkaReporterUtils.class.getClassLoader().getResourceAsStream(METRIC_REPORT_AVRO_SCHEMA_FILE));
+ }
+
+ public static Schema getGobblinTrackingEventSchema() throws IOException {
+ return new Schema.Parser()
+ .parse(KafkaReporterUtils.class.getClassLoader().getResourceAsStream(GOBBLIN_TRACKING_EVENT_AVRO_SCHEMA_FILE));
+ }
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
index bffd612..9c52f10 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
@@ -26,11 +26,14 @@ import org.apache.avro.Schema;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
-import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.util.ConfigUtils;
@@ -41,50 +44,46 @@ import org.apache.gobblin.util.ConfigUtils;
* {@link org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry} to get Schema version identifier and write it to
* {@link java.io.DataOutputStream}.
*/
+@Slf4j
public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema> {
private final KafkaAvroSchemaRegistry registry;
private Map<Schema, String> registrySchemaIds;
- private final Optional<String> overrideName;
- private final Optional<Schema> schema;
- private final Optional<String> schemaId;
+ private final String overrideName;
+ private final Schema schema;
+ private final String schemaId;
private final int schemaIdLengthBytes;
public SchemaRegistryVersionWriter(Config config)
- throws IOException {
- this(new KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config)), Optional.<String>absent(),
- Optional.<Schema>absent());
+ throws SchemaRegistryException {
+ this(new KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config)), null, null, null);
}
- public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, String overrideName)
- throws IOException {
- this(registry, overrideName, Optional.<Schema>absent());
+ public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName) throws SchemaRegistryException {
+ this(registry, overrideName, null);
}
- public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, String overrideName,
- Optional<Schema> singleSchema)
- throws IOException {
- this(registry, Optional.of(overrideName), singleSchema);
+ public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName, @Nullable Schema singleSchema)
+ throws SchemaRegistryException {
+ this(registry, overrideName, singleSchema, null);
}
- public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, Optional<String> overrideName,
- Optional<Schema> singleSchema)
- throws IOException {
+ public SchemaRegistryVersionWriter(KafkaAvroSchemaRegistry registry, @Nullable String overrideName, @Nullable Schema singleSchema, @Nullable String schemaId)
+ throws SchemaRegistryException {
this.registry = registry;
this.registrySchemaIds = Maps.newConcurrentMap();
this.overrideName = overrideName;
this.schema = singleSchema;
this.schemaIdLengthBytes = registry.getSchemaIdLengthByte();
- if (this.schema.isPresent()) {
- try {
- this.schemaId = this.overrideName.isPresent() ? Optional
- .of(this.registry.register(this.schema.get(), this.overrideName.get()))
- : Optional.of(this.registry.register(this.schema.get()));
- } catch (SchemaRegistryException e) {
- throw Throwables.propagate(e);
- }
+ if ((this.schema != null) && (schemaId == null)) {
+ this.schemaId =
+ (!Strings.isNullOrEmpty(this.overrideName)) ? this.registry.register(this.schema, this.overrideName)
+ : this.registry.register(this.schema);
} else {
- this.schemaId = Optional.absent();
+ if (schemaId != null) {
+ log.info("Skipping registering schema with schema registry. Using schema with id: {}", schemaId);
+ }
+ this.schemaId = schemaId;
}
}
@@ -92,7 +91,7 @@ public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema>
public void writeSchemaVersioningInformation(Schema schema, DataOutputStream outputStream)
throws IOException {
- String schemaId = this.schemaId.isPresent() ? this.schemaId.get() : this.getIdForSchema(schema);
+ String schemaId = this.schemaId != null ? this.schemaId : this.getIdForSchema(schema);
outputStream.writeByte(KafkaAvroSchemaRegistry.MAGIC_BYTE);
try {
@@ -105,7 +104,7 @@ public class SchemaRegistryVersionWriter implements SchemaVersionWriter<Schema>
private String getIdForSchema(Schema schema) {
if (!this.registrySchemaIds.containsKey(schema)) {
try {
- String schemaId = this.overrideName.isPresent() ? this.registry.register(schema, this.overrideName.get())
+ String schemaId = !Strings.isNullOrEmpty(this.overrideName) ? this.registry.register(schema, this.overrideName)
: this.registry.register(schema);
this.registrySchemaIds.put(schema, schemaId);
} catch (SchemaRegistryException e) {
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
index d4e5939..995c41b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
@@ -17,29 +17,41 @@
package org.apache.gobblin.metrics.reporter;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.Pusher;
import org.apache.gobblin.metrics.reporter.util.EventUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import com.google.common.collect.Lists;
+public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest {
+ private static final int SCHEMA_ID_LENGTH_BYTES = 20;
+ private String schemaId;
-public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest {
+ @BeforeClass
+ public void setUp() throws IOException {
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+ this.schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+ }
@Override
public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context,
@@ -48,19 +60,15 @@ public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTe
return builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", "k2", "k3"));
}
- private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it) throws IOException {
+ private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it, boolean isSchemaIdEnabled) throws IOException {
Assert.assertTrue(it.hasNext());
Pair<String, byte[]> event = it.next();
- return Pair.of(event.getKey(), EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), event.getValue()));
+ return isSchemaIdEnabled ? Pair.of(event.getKey(), EventUtils
+ .deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), event.getValue(), schemaId)) : Pair.of(event.getKey(),
+ EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), event.getValue()));
}
- @Test
- public void testKafkaEventReporter() throws IOException {
- MetricContext context = MetricContext.builder("context").build();
-
- MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
- KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
-
+ private GobblinTrackingEvent getEvent(boolean isMessageKeyed) {
String namespace = "gobblin.metrics.test";
String eventName = "testEvent";
@@ -70,27 +78,58 @@ public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTe
Map<String, String> metadata = Maps.newHashMap();
metadata.put("m1", "v1");
metadata.put("m2", null);
+ if (isMessageKeyed) {
+ metadata.put("k1", "v1");
+ metadata.put("k2", "v2");
+ metadata.put("k3", "v3");
+ }
+
event.setMetadata(metadata);
- context.submitEvent(event);
+ return event;
+ }
+
+ @Test
+ public void testKafkaEventReporter() throws IOException {
+ MetricContext context = MetricContext.builder("context").build();
+
+ MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
+ KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic");
+
+ context.submitEvent(getEvent(false));
kafkaReporter.report();
- Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator());
+ Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator(), false);
Assert.assertNull(retrievedEvent.getKey());
- event = new GobblinTrackingEvent();
- event.setName(eventName);
- event.setNamespace(namespace);
- metadata = Maps.newHashMap();
- metadata.put("k1", "v1");
- metadata.put("k2", "v2");
- metadata.put("k3", "v3");
- event.setMetadata(metadata);
- context.submitEvent(event);
+ context.submitEvent(getEvent(true));
+
+ kafkaReporter.report();
+
+ retrievedEvent = nextKVEvent(pusher.messageIterator(), false);
+ Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3");
+ }
+
+ @Test
+ public void testKafkaEventReporterWithSchemaRegistry() throws IOException {
+ MetricContext context = MetricContext.builder("context").build();
+ MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
+
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+ String schemaId = DigestUtils.sha1Hex(schema.toString().getBytes());
+
+ KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+ KafkaAvroEventKeyValueReporter kafkaReporter =
+ builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", "k2", "k3"))
+ .withSchemaRegistry(Mockito.mock(KafkaAvroSchemaRegistry.class)).withSchemaId(schemaId)
+ .build("localhost:0000", "topic");
+
+ context.submitEvent(getEvent(true));
kafkaReporter.report();
- retrievedEvent = nextKVEvent(pusher.messageIterator());
+ Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator(), true);
Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3");
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
index e240a53..bbb9034 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
@@ -45,6 +45,6 @@ public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
throws IOException {
Assert.assertTrue(it.hasNext());
- return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next());
+ return EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), it.next());
}
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java
new file mode 100644
index 0000000..4524a63
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterWithSchemaRegistryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+
+/**
+ * Test for KafkaAvroReporter that is configured with a {@link KafkaAvroSchemaRegistry}.
+ * Extends KafkaAvroReporterTest and just redefines the builder and the metrics deserializer
+ *
+ * @author Sudarshan Vasudevan
+ */
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroReporterWithSchemaRegistryTest extends KafkaAvroReporterTest {
+ private static final int SCHEMA_ID_LENGTH_BYTES = 20;
+
+ private String schemaId;
+ private KafkaAvroSchemaRegistry schemaRegistry;
+
+ public KafkaAvroReporterWithSchemaRegistryTest(String topic)
+ throws IOException, InterruptedException {
+ super();
+ this.schemaId = getSchemaId();
+ this.schemaRegistry = getMockSchemaRegistry();
+ }
+
+ public KafkaAvroReporterWithSchemaRegistryTest()
+ throws IOException, InterruptedException {
+ this("KafkaAvroReporterTestWithSchemaRegistry");
+ this.schemaId = getSchemaId();
+ }
+
+ private KafkaAvroSchemaRegistry getMockSchemaRegistry() throws IOException {
+ KafkaAvroSchemaRegistry registry = Mockito.mock(KafkaAvroSchemaRegistry.class);
+ Mockito.when(registry.getSchemaIdLengthByte()).thenAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) {
+ return KafkaAvroReporterWithSchemaRegistryTest.SCHEMA_ID_LENGTH_BYTES;
+ }
+ });
+ Mockito.when(registry.getSchemaByKey(Mockito.anyString())).thenAnswer(new Answer<Schema>() {
+ @Override
+ public Schema answer(InvocationOnMock invocation) {
+ return MetricReport.SCHEMA$;
+ }
+ });
+ return registry;
+ }
+
+ private String getSchemaId() throws IOException {
+ Schema schema =
+ new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc"));
+ return DigestUtils.sha1Hex(schema.toString().getBytes());
+ }
+
+ @Override
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
+ return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher).withSchemaRegistry(this.schemaRegistry).withSchemaId(schemaId);
+ }
+
+ @Override
+ public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
+ return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher).withSchemaRegistry(this.schemaRegistry).withSchemaId(schemaId);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected MetricReport nextReport(Iterator<byte[]> it)
+ throws IOException {
+ Assert.assertTrue(it.hasNext());
+ return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next(), this.schemaId);
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
index d2bf63f..9c23972 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -32,7 +32,7 @@ import com.google.common.collect.Maps;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -65,7 +65,7 @@ public class KeyValueEventObjectReporterTest extends KeyValueEventObjectReporter
*/
public static KeyValueEventObjectReporterTest.Builder getBuilder(MetricContext context, Properties props) {
KeyValueEventObjectReporterTest.Builder builder = new KeyValueEventObjectReporterTest.Builder(context);
- builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props))
+ builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(props))
.withConfig(ConfigUtils.propertiesToConfig(props));
return builder;
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
index f674b0e..64dfbee 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -30,7 +30,7 @@ import com.typesafe.config.Config;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -62,7 +62,7 @@ public class KeyValueMetricObjectReporterTest extends KeyValueMetricObjectReport
*/
public static KeyValueMetricObjectReporterTest.Builder getBuilder(Properties props) {
KeyValueMetricObjectReporterTest.Builder builder = new KeyValueMetricObjectReporterTest.Builder();
- builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props));
+ builder.namespaceOverride(KafkaReporterUtils.extractOverrideNamespace(props));
return builder;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 31232c4..b670a7a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -30,7 +30,6 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
import com.typesafe.config.Config;
import lombok.Getter;
@@ -69,7 +68,7 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
create(ConfigUtils.configToProperties(config));
- this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, Optional.of(GobblinTrackingEvent.SCHEMA$));
+ this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, GobblinTrackingEvent.SCHEMA$);
} else {
this.schemaVersionWriter = new FixedSchemaVersionWriter();
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 82842f2..5fa64e1 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.mail.EmailException;
import org.apache.hadoop.conf.Configuration;
@@ -75,6 +77,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -91,6 +94,8 @@ import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
@@ -99,6 +104,9 @@ import org.apache.gobblin.cluster.GobblinHelixConstants;
import org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.rest.JobExecutionInfoServer;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.util.ClassAliasResolver;
@@ -149,6 +157,7 @@ import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_
* @author Yinan Li
*/
public class GobblinYarnAppLauncher {
+ public static final String GOBBLIN_YARN_CONFIG_OUTPUT_PATH = "gobblin.yarn.configOutputPath";
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinYarnAppLauncher.class);
@@ -287,6 +296,12 @@ public class GobblinYarnAppLauncher {
this.messagingService = new GobblinHelixMessagingService(this.helixManager);
+ try {
+ config = addDynamicConfig(config);
+ outputConfigToFile(config);
+ } catch (SchemaRegistryException e) {
+ throw new IOException(e);
+ }
}
/**
@@ -919,6 +934,79 @@ public class GobblinYarnAppLauncher {
}
}
+ private static Config addDynamicConfig(Config config) throws IOException {
+ Properties properties = ConfigUtils.configToProperties(config);
+ if (KafkaReporterUtils.isKafkaReportingEnabled(properties) && KafkaReporterUtils.isKafkaAvroSchemaRegistryEnabled(properties)) {
+ KafkaAvroSchemaRegistry registry = new KafkaAvroSchemaRegistry(properties);
+ return addMetricReportingDynamicConfig(config, registry);
+ } else {
+ return config;
+ }
+ }
+
+ /**
+ * Write the config to the file specified with the config key {@value GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it
+ * is configured.
+ * @param config the config to output
+ * @throws IOException
+ */
+ @VisibleForTesting
+ static void outputConfigToFile(Config config)
+ throws IOException {
+ // If a file path is specified then write the Azkaban config to that path in HOCON format.
+ // This can be used to generate an application.conf file to pass to the yarn app master and containers.
+ if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) {
+ File configFile = new File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH));
+ File parentDir = configFile.getParentFile();
+
+ if (parentDir != null && !parentDir.exists()) {
+ if (!parentDir.mkdirs()) {
+ throw new IOException("Error creating directories for " + parentDir);
+ }
+ }
+
+ ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
+ configRenderOptions = configRenderOptions.setComments(false);
+ configRenderOptions = configRenderOptions.setOriginComments(false);
+ configRenderOptions = configRenderOptions.setFormatted(true);
+ configRenderOptions = configRenderOptions.setJson(false);
+
+ String renderedConfig = config.root().render(configRenderOptions);
+
+ FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
+ }
+ }
+
+ /**
+ * A method that adds dynamic config related to Kafka-based metric reporting. In particular, if Kafka based metric
+ * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured, this method registers the metric reporting
+ * related schemas and adds the returned schema ids to the config to be used by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster}
+ * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The advantage of doing this is that the TaskRunners
+ * do not have to initiate a connection with the schema registry server and reduces the chances of metric reporter
+ * instantiation failures in the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s.
+ * @param config
+ */
+ @VisibleForTesting
+ static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegistry registry) throws IOException {
+ Properties properties = ConfigUtils.configToProperties(config);
+ if (KafkaReporterUtils.isEventsEnabled(properties)) {
+ Schema schema = KafkaReporterUtils.getGobblinTrackingEventSchema();
+ String schemaId = registry.register(schema, KafkaReporterUtils.getEventsTopic(properties).get());
+ LOGGER.info("Adding schemaId {} for GobblinTrackingEvent to the config", schemaId);
+ config = config.withValue(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID,
+ ConfigValueFactory.fromAnyRef(schemaId));
+ }
+
+ if (KafkaReporterUtils.isMetricsEnabled(properties)) {
+ Schema schema = KafkaReporterUtils.getMetricReportSchema();
+ String schemaId = registry.register(schema, KafkaReporterUtils.getMetricsTopic(properties).get());
+ LOGGER.info("Adding schemaId {} for MetricReport to the config", schemaId);
+ config = config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+ ConfigValueFactory.fromAnyRef(schemaId));
+ }
+ return config;
+ }
+
public static void main(String[] args) throws Exception {
final GobblinYarnAppLauncher gobblinYarnAppLauncher =
new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration());
@@ -942,4 +1030,4 @@ public class GobblinYarnAppLauncher {
gobblinYarnAppLauncher.launch();
}
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index c1c7142..46042c4 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -25,12 +25,15 @@ import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +49,8 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -54,6 +59,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
@@ -72,6 +78,8 @@ import org.apache.gobblin.cluster.TestHelper;
import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -426,6 +434,45 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();
}
+ @Test
+ public void testOutputConfig() throws IOException {
+ File tmpTestDir = com.google.common.io.Files.createTempDir();
+
+ try {
+ Path outputPath = Paths.get(tmpTestDir.toString(), "application.conf");
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FS_URI_KEY, ConfigValueFactory.fromAnyRef("file:///"))
+ .withValue(GobblinYarnAppLauncher.GOBBLIN_YARN_CONFIG_OUTPUT_PATH,
+ ConfigValueFactory.fromAnyRef(outputPath.toString()));
+
+ GobblinYarnAppLauncher.outputConfigToFile(config);
+
+ String configString = com.google.common.io.Files.toString(outputPath.toFile(), Charsets.UTF_8);
+ Assert.assertTrue(configString.contains("fs"));
+ } finally {
+ FileUtils.deleteDirectory(tmpTestDir);
+ }
+ }
+
+ @Test
+ public void testAddMetricReportingDynamicConfig()
+ throws IOException {
+ KafkaAvroSchemaRegistry schemaRegistry = Mockito.mock(KafkaAvroSchemaRegistry.class);
+ Mockito.when(schemaRegistry.register(Mockito.any(Schema.class), Mockito.anyString())).thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation) {
+ return "testId";
+ }
+ });
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS, ConfigValueFactory.fromAnyRef("topic"))
+ .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, ConfigValueFactory.fromAnyRef(true))
+ .withValue(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, ConfigValueFactory.fromAnyRef("http://testSchemaReg:0000"));
+ config = GobblinYarnAppLauncher.addMetricReportingDynamicConfig(config, schemaRegistry);
+ Assert.assertEquals(config.getString(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID), "testId");
+ Assert.assertFalse(config.hasPath(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID));
+ }
+
/**
* An application master for accessing protected fields in {@link GobblinApplicationMaster}
* for testing.
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 160ad39..63e2cd5 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -50,6 +50,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -64,6 +69,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -284,7 +290,25 @@ public class YarnServiceTest {
static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
- super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
+ super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config));
+ }
+
+ private static HelixManager getMockHelixManager(Config config) {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+ HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+ PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+ PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+ Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+ Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+ Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean());
+ Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+ Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+ Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+ return helixManager;
}
protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)