You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/09 20:03:22 UTC
incubator-gobblin git commit: [GOBBLIN-195] Ability to switch Avro
schema namespace switch before registering with Kafka Avro Schema registry
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 42677dc8c -> ce60d2c7c
[GOBBLIN-195] Ability to switch Avro schema namespace switch before registering with Kafka Avro Schema registry
Closes #2049 from
abti/avro_schema_namespace_switch
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ce60d2c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ce60d2c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ce60d2c7
Branch: refs/heads/master
Commit: ce60d2c7c4e3df1ac85b49993d4a3ccfafc40d2c
Parents: 42677dc
Author: Abhishek Tiwari <ab...@gmail.com>
Authored: Wed Aug 9 13:03:09 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Aug 9 13:03:09 2017 -0700
----------------------------------------------------------------------
.../KafkaSchemaRegistryConfigurationKeys.java | 1 +
.../kafka/schemareg/LiKafkaSchemaRegistry.java | 11 ++
.../metrics/kafka/KafkaAvroSchemaRegistry.java | 11 ++
.../reporter/util/KafkaAvroReporterUtil.java | 75 +++++++++++++
.../java/org/apache/gobblin/util/AvroUtils.java | 109 +++++++++++++++++++
.../org/apache/gobblin/util/AvroUtilsTest.java | 23 ++++
6 files changed, 230 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
index d6f023b..9158663 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
@@ -24,4 +24,5 @@ public class KafkaSchemaRegistryConfigurationKeys {
public final static String KAFKA_SCHEMA_REGISTRY_CLASS = "kafka.schemaRegistry.class";
public final static String KAFKA_SCHEMA_REGISTRY_URL = "kafka.schemaRegistry.url";
public final static String KAFKA_SCHEMA_REGISTRY_CACHE = "kafka.schemaRegistry.cache";
+ public final static String KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE = "kafka.schemaRegistry.overrideNamespace";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
index 2ea7075..e6afbae 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.kafka.schemareg;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
@@ -29,9 +30,11 @@ import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -53,6 +56,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch
private final GenericObjectPool<HttpClient> httpClientPool;
private final String url;
+ private final Optional<Map<String, String>> namespaceOverride;
/**
* @param props properties should contain property "kafka.schema.registry.url", and optionally
@@ -64,6 +68,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch
String.format("Property %s not provided.", KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL));
this.url = props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
+ this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
int objPoolSize =
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
@@ -171,6 +176,12 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch
* @throws SchemaRegistryException if registration failed
*/
public synchronized MD5Digest register(Schema schema) throws SchemaRegistryException {
+
+ // Change namespace if override specified
+ if (this.namespaceOverride.isPresent()) {
+ schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get());
+ }
+
LOG.info("Registering schema " + schema.toString());
PostMethod post = new PostMethod(url);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index d391ef3..4c155fb 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.metrics.kafka;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@@ -30,9 +31,11 @@ import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -59,6 +62,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
private final GenericObjectPool<HttpClient> httpClientPool;
private final String url;
+ private final Optional<Map<String, String>> namespaceOverride;
/**
* @param properties properties should contain property "kafka.schema.registry.url", and optionally
@@ -71,6 +75,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
String.format("Property %s not provided.", KAFKA_SCHEMA_REGISTRY_URL));
this.url = props.getProperty(KAFKA_SCHEMA_REGISTRY_URL);
+ this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props);
int objPoolSize =
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
@@ -186,6 +191,12 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
*/
@Override
public synchronized String register(Schema schema) throws SchemaRegistryException {
+
+ // Change namespace if override specified
+ if (this.namespaceOverride.isPresent()) {
+ schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get());
+ }
+
LOG.info("Registering schema " + schema.toString());
PostMethod post = new PostMethod(url);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
new file mode 100644
index 0000000..1d82921
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.reporter.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+public class KafkaAvroReporterUtil {
+
+ private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+ private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
+
+ /***
+ * This method extracts Map of namespaces to override in Kafka schema from Config.
+ *
+ * Example config:
+ * kafka.schemaRegistry.overrideNamespace = namespace1:replacement1,namespace2:replacement2
+ *
+ * For the above example, this method will create a Map with values:
+ * {
+ * "namespace1" : "replacement1",
+ * "namespace2" : "replacement2"
+ * }
+ *
+ * @param properties Properties properties.
+ * @return Map of namespace overrides.
+ */
+ public static Optional<Map<String, String>> extractOverrideNamespace(Properties properties) {
+ if (properties.containsKey(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE)) {
+
+ Map<String, String> namespaceOverridesMap = Maps.newHashMap();
+ List<String> namespaceOverrides = Lists.newArrayList(SPLIT_BY_COMMA.split(properties
+ .getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE)));
+
+ for (String namespaceOverride : namespaceOverrides) {
+ List<String> override = Lists.newArrayList(SPLIT_BY_COLON.split(namespaceOverride));
+ if (override.size() != 2) {
+ throw new RuntimeException("Namespace override should be of the format originalNamespace:replacementNamespace,"
+ + " found: " + namespaceOverride);
+ }
+ namespaceOverridesMap.put(override.get(0), override.get(1));
+ }
+
+ // If no entry found in the config value, mark it absent
+ if (namespaceOverridesMap.size() != 0) {
+ return Optional.of(namespaceOverridesMap);
+ }
+ }
+
+ return Optional.<Map<String, String>>absent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 694a40b..d09a6d9 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -49,6 +49,7 @@ import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -56,6 +57,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -612,6 +614,113 @@ public class AvroUtils {
}
/**
+ * Copies the input {@link org.apache.avro.Schema} but changes the schema namespace.
+ * @param schema {@link org.apache.avro.Schema} to copy.
+ * @param namespaceOverride namespace for the copied {@link org.apache.avro.Schema}.
+ * @return A {@link org.apache.avro.Schema} that is a copy of schema, but has the new namespace.
+ */
+ public static Schema switchNamespace(Schema schema, Map<String, String> namespaceOverride) {
+ Schema newSchema;
+ String newNamespace = StringUtils.EMPTY;
+
+ // Process all Schema Types
+ // (Primitives are simply cloned)
+ switch (schema.getType()) {
+ case ENUM:
+ newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace())
+ : schema.getNamespace();
+ newSchema =
+ Schema.createEnum(schema.getName(), schema.getDoc(), newNamespace, schema.getEnumSymbols());
+ break;
+ case FIXED:
+ newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace())
+ : schema.getNamespace();
+ newSchema =
+ Schema.createFixed(schema.getName(), schema.getDoc(), newNamespace, schema.getFixedSize());
+ break;
+ case MAP:
+ newSchema = Schema.createMap(switchNamespace(schema.getValueType(), namespaceOverride));
+ break;
+ case RECORD:
+ newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace())
+ : schema.getNamespace();
+ List<Schema.Field> newFields = new ArrayList<>();
+ if (schema.getFields().size() > 0) {
+ for (Schema.Field oldField : schema.getFields()) {
+ Field newField = new Field(oldField.name(), switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(),
+ oldField.defaultValue(), oldField.order());
+ newFields.add(newField);
+ }
+ }
+ newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), newNamespace,
+ schema.isError());
+ newSchema.setFields(newFields);
+ break;
+ case UNION:
+ List<Schema> newUnionMembers = new ArrayList<>();
+ if (null != schema.getTypes() && schema.getTypes().size() > 0) {
+ for (Schema oldUnionMember : schema.getTypes()) {
+ newUnionMembers.add(switchNamespace(oldUnionMember, namespaceOverride));
+ }
+ }
+ newSchema = Schema.createUnion(newUnionMembers);
+ break;
+ case ARRAY:
+ newSchema = Schema.createArray(switchNamespace(schema.getElementType(), namespaceOverride));
+ break;
+ case BOOLEAN:
+ case BYTES:
+ case DOUBLE:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case NULL:
+ case STRING:
+ newSchema = Schema.create(schema.getType());
+ break;
+ default:
+ String exceptionMessage = String.format("Schema namespace replacement failed for \"%s\" ", schema);
+ LOG.error(exceptionMessage);
+
+ throw new AvroRuntimeException(exceptionMessage);
+ }
+
+ // Copy schema metadata
+ copyProperties(schema, newSchema);
+
+ return newSchema;
+ }
+
+ /***
+ * Copy properties from old Avro Schema to new Avro Schema
+ * @param oldSchema Old Avro Schema to copy properties from
+ * @param newSchema New Avro Schema to copy properties to
+ */
+ private static void copyProperties(Schema oldSchema, Schema newSchema) {
+ Preconditions.checkNotNull(oldSchema);
+ Preconditions.checkNotNull(newSchema);
+
+ Map<String, JsonNode> props = oldSchema.getJsonProps();
+ copyProperties(props, newSchema);
+ }
+
+ /***
+ * Copy properties to an Avro Schema
+ * @param props Properties to copy to Avro Schema
+ * @param schema Avro Schema to copy properties to
+ */
+ private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
+ Preconditions.checkNotNull(schema);
+
+ // (if null, don't copy but do not throw exception)
+ if (null != props) {
+ for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+ schema.addProp(prop.getKey(), prop.getValue());
+ }
+ }
+ }
+
+ /**
* Serialize a generic record as a relative {@link Path}. Useful for converting {@link GenericRecord} type keys
* into file system locations. For example {field1=v1, field2=v2} returns field1=v1/field2=v2 if includeFieldNames
* is true, or v1/v2 if it is false. Illegal HDFS tokens such as ':' and '\\' will be replaced with '_'.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index cadc507..e89b8b9 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -37,6 +39,8 @@ import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.Maps;
+
public class AvroUtilsTest {
private static final String AVRO_DIR = "gobblin-utility/src/test/resources/avroDirParent/";
@@ -196,6 +200,25 @@ public class AvroUtilsTest {
}
+ @Test
+ public void testSwitchNamespace() {
+ String originalNamespace = "originalNamespace";
+ String originalName = "originalName";
+ String newNamespace = "newNamespace";
+ Schema schema = SchemaBuilder.builder(originalNamespace).record(originalName).fields().
+ requiredDouble("double").optionalFloat("float").endRecord();
+
+ Map<String, String> map = Maps.newHashMap();
+ map.put(originalNamespace, newNamespace);
+ Schema newSchema = AvroUtils.switchNamespace(schema, map);
+
+ Assert.assertEquals(newSchema.getNamespace(), newNamespace);
+ Assert.assertEquals(newSchema.getName(), originalName);
+ for(Schema.Field field : newSchema.getFields()) {
+ Assert.assertEquals(field, schema.getField(field.name()));
+ }
+ }
+
@Test public void testSerializeAsPath() throws Exception {
Schema schema =