You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/24 09:06:13 UTC

incubator-gobblin git commit: [GOBBLIN-221] Add Json to Avro converter and bytes to Json converter

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master af8462581 -> 3f04c60d2


[GOBBLIN-221] Add Json to Avro converter and bytes to Json converter

Add JsonRecordAvroSchemaToAvroConverter and
BytesToJsonConverter

Address comments

Closes #2072 from jack-moseley/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3f04c60d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3f04c60d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3f04c60d

Branch: refs/heads/master
Commit: 3f04c60d29dedebd1487ef394249a9e9e702102a
Parents: af84625
Author: Jack Moseley <jm...@jmoseley-mn1.linkedin.biz>
Authored: Thu Aug 24 02:06:05 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Thu Aug 24 02:06:05 2017 -0700

----------------------------------------------------------------------
 ...nElementConversionWithAvroSchemaFactory.java | 166 +++++++++++++++++++
 .../JsonRecordAvroSchemaToAvroConverter.java    | 117 +++++++++++++
 .../converter/json/BytesToJsonConverter.java    |  51 ++++++
 ...JsonRecordAvroSchemaToAvroConverterTest.java |  79 +++++++++
 .../json/BytesToJsonConverterTest.java          |  43 +++++
 .../resources/converter/jsonToAvroRecord.json   |  13 ++
 .../resources/converter/jsonToAvroSchema.avsc   |  46 +++++
 7 files changed, 515 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
new file mode 100644
index 0000000..ca5d88d
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * Creates a converter for Json types to Avro types. Overrides {@link ArrayConverter}, {@link MapConverter},
+ * and {@link EnumConverter} from {@link JsonElementConversionFactory} to use an Avro schema instead of Json schema for
+ * determining type
+ */
+public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConversionFactory {
+
+  /**
+   * Use to create a converter for a single field from a schema.
+   */
+
+  public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode,
+      WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException {
+
+    Type type;
+    try {
+      type = Type.valueOf(fieldType.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new UnsupportedDateTypeException(fieldType + " is unsupported");
+    }
+
+    switch (type) {
+      case ARRAY:
+        return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state);
+
+      case MAP:
+        return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), schemaNode, state);
+
+      case ENUM:
+        return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), schemaNode);
+
+      default:
+        return JsonElementConversionFactory.getConvertor(fieldName, fieldType, null, state, nullable);
+    }
+  }
+
+  public static class ArrayConverter extends ComplexConverter {
+
+    public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+        WorkUnitState state) throws UnsupportedDateTypeException {
+      super(fieldName, nullable, sourceType);
+      super.setElementConverter(
+          getConvertor(fieldName, schemaNode.getElementType().getType().getName(),
+              schemaNode.getElementType(), state, isNullable()));
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      List<Object> list = new ArrayList<>();
+
+      for (JsonElement elem : (JsonArray) value) {
+        list.add(getElementConverter().convertField(elem));
+      }
+
+      return new GenericData.Array<>(schema(), list);
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return Schema.Type.ARRAY;
+    }
+
+    @Override
+    public Schema schema() {
+      Schema schema = Schema.createArray(getElementConverter().schema());
+      schema.addProp("source.type", "array");
+      return schema;
+    }
+  }
+
+  public static class MapConverter extends ComplexConverter {
+
+    public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode,
+        WorkUnitState state) throws UnsupportedDateTypeException {
+      super(fieldName, nullable, sourceType);
+      super.setElementConverter(
+          getConvertor(fieldName, schemaNode.getValueType().getType().getName(),
+              schemaNode.getValueType(), state, isNullable()));
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      Map<String, Object> map = new HashMap<>();
+
+      for (Map.Entry<String, JsonElement> entry : ((JsonObject) value).entrySet()) {
+        map.put(entry.getKey(), getElementConverter().convertField(entry.getValue()));
+      }
+
+      return map;
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return Schema.Type.MAP;
+    }
+
+    @Override
+    public Schema schema() {
+      Schema schema = Schema.createMap(getElementConverter().schema());
+      schema.addProp("source.type", "map");
+      return schema;
+    }
+  }
+
+  public static class EnumConverter extends JsonElementConverter {
+    String enumName;
+    List<String> enumSet = new ArrayList<>();
+    Schema schema;
+
+    public EnumConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode) {
+      super(fieldName, nullable, sourceType);
+
+      this.enumSet.addAll(schemaNode.getEnumSymbols());
+
+      this.enumName = schemaNode.getType().getName();
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      return new GenericData.EnumSymbol(this.schema, value.getAsString());
+    }
+
+    @Override
+    public Schema.Type getTargetType() {
+      return Schema.Type.ENUM;
+    }
+
+    @Override
+    public Schema schema() {
+      this.schema = Schema.createEnum(this.enumName, "", "", this.enumSet);
+      this.schema.addProp("source.type", "enum");
+      return this.schema;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
new file mode 100644
index 0000000..c3edd25
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import org.apache.gobblin.converter.ToAvroConverterBase;
+import com.google.gson.JsonObject;
+
+
+/**
+ * {@link Converter} that takes an Avro schema from config and corresponding {@link JsonObject} records and
+ * converts them to {@link GenericRecord} using the schema
+ */
+public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase<SI, JsonObject> {
+
+  public static final String AVRO_SCHEMA_KEY = "gobblin.converter.avroSchema";
+
+  private Schema schema;
+
+  public ToAvroConverterBase<SI, JsonObject> init(WorkUnitState workUnit) {
+    super.init(workUnit);
+    Preconditions.checkArgument(workUnit.contains(AVRO_SCHEMA_KEY));
+    this.schema = new Schema.Parser().parse(workUnit.getProp(AVRO_SCHEMA_KEY));
+    return this;
+  }
+
+  /**
+   * Ignore input schema and parse in Avro schema from config
+   */
+  @Override
+  public Schema convertSchema(SI inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+    return this.schema;
+  }
+
+  /**
+   * Take in {@link JsonObject} input records and convert them to {@link GenericRecord} using outputSchema
+   */
+  @Override
+  public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
+      throws DataConversionException {
+    GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit);
+    return new SingleRecordIterable<>(avroRecord);
+  }
+
+  private GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException {
+    GenericRecord avroRecord = new GenericData.Record(outputSchema);
+    JsonElementConversionWithAvroSchemaFactory.JsonElementConverter converter;
+    for (Schema.Field field : outputSchema.getFields()) {
+      if (inputRecord.get(field.name()) == null) {
+        throw new DataConversionException("Field missing from record: " + field.name());
+      }
+
+      Schema.Type type = field.schema().getType();
+      boolean nullable = false;
+      Schema schema = field.schema();
+
+      if (type.equals(Schema.Type.UNION)) {
+        nullable = true;
+        List<Schema> types = field.schema().getTypes();
+        if (types.size() != 2) {
+          throw new DataConversionException("Unions must be size 2, and contain one null");
+        }
+        if (field.schema().getTypes().get(0).getType().equals(Schema.Type.NULL)) {
+          schema = field.schema().getTypes().get(1);
+          type = schema.getType();
+        } else if (field.schema().getTypes().get(1).getType().equals(Schema.Type.NULL)) {
+          schema = field.schema().getTypes().get(0);
+          type = schema.getType();
+        } else {
+          throw new DataConversionException("Unions must be size 2, and contain one null");
+        }
+      }
+
+      if (type.equals(Schema.Type.RECORD)) {
+        if (nullable && inputRecord.get(field.name()).isJsonNull()) {
+          avroRecord.put(field.name(), null);
+        } else {
+          avroRecord.put(field.name(),
+              convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit));
+        }
+      } else {
+        try {
+          converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema,
+              workUnit, nullable);
+          avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name())));
+        } catch (Exception e) {
+          throw new DataConversionException("Could not convert field " + field.name());
+        }
+      }
+    }
+    return avroRecord;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/main/java/org/apache/gobblin/converter/json/BytesToJsonConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/BytesToJsonConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/BytesToJsonConverter.java
new file mode 100644
index 0000000..66cb0de
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/BytesToJsonConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import com.google.common.base.Charsets;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+
+/**
+ * Converter that takes a UTF-8 encoded JSON string and converts it to a {@link JsonObject}
+ */
+public class BytesToJsonConverter extends Converter<String, String, byte[], JsonObject> {
+
+  @Override
+  public String convertSchema(String inputSchema, WorkUnitState workUnit) {
+      return inputSchema;
+  }
+
+  @Override
+  public Iterable<JsonObject> convertRecord(String outputSchema, byte[] inputRecord, WorkUnitState workUnit)
+      throws DataConversionException {
+    if (inputRecord == null) {
+      throw new DataConversionException("Input record is null");
+    }
+
+    String jsonString = new String(inputRecord, Charsets.UTF_8);
+    JsonParser parser = new JsonParser();
+    JsonObject outputRecord = parser.parse(jsonString).getAsJsonObject();
+
+    return new SingleRecordIterable<>(outputRecord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
new file mode 100644
index 0000000..7da37b7
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.workunit.Extract.TableType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+
+/**
+ * Unit test for {@link JsonRecordAvroSchemaToAvroConverter}
+ */
+@Test(groups = {"gobblin.converter"})
+public class JsonRecordAvroSchemaToAvroConverterTest {
+  private JsonObject jsonRecord;
+  private WorkUnitState state;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    String avroSchemaString = IOUtils.toString(this.getClass().getResourceAsStream("/converter/jsonToAvroSchema.avsc"), StandardCharsets.UTF_8);
+
+    this.jsonRecord = new JsonParser().parse(IOUtils.toString(this.getClass().getResourceAsStream(
+        "/converter/jsonToAvroRecord.json"), StandardCharsets.UTF_8)).getAsJsonObject();
+
+    SourceState source = new SourceState();
+    this.state = new WorkUnitState(
+        source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace")));
+    this.state.setProp(JsonRecordAvroSchemaToAvroConverter.AVRO_SCHEMA_KEY, avroSchemaString);
+  }
+
+  @Test
+  public void testConverter()
+      throws Exception {
+    JsonRecordAvroSchemaToAvroConverter<String> converter = new JsonRecordAvroSchemaToAvroConverter();
+
+    converter.init(this.state);
+
+    Schema avroSchema = converter.convertSchema("dummy", this.state);
+
+    GenericRecord record = converter.convertRecord(avroSchema, this.jsonRecord, this.state).iterator().next();
+
+    Assert.assertEquals(record.get("nullableField"), null);
+    Assert.assertEquals(record.get("longField"), 1234L);
+
+    Assert.assertTrue(record.get("arrayField") instanceof GenericArray);
+
+    Assert.assertTrue(record.get("mapField") instanceof Map);
+
+    Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField").toString(), "test");
+    Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField2").toString(), "test2");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/test/java/org/apache/gobblin/converter/json/BytesToJsonConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/json/BytesToJsonConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/BytesToJsonConverterTest.java
new file mode 100644
index 0000000..0368718
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/BytesToJsonConverterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.json;
+
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+import com.google.gson.JsonObject;
+
+/**
+ * Unit test for {@link BytesToJsonConverter}
+ */
+@Test(groups = {"gobblin.converter"})
+public class BytesToJsonConverterTest {
+  @Test
+  public void testConverter() throws DataConversionException, IOException {
+    BytesToJsonConverter converter = new BytesToJsonConverter();
+    WorkUnitState state = new WorkUnitState();
+
+    JsonObject record = converter.convertRecord("dummySchema",
+        IOUtils.toByteArray(this.getClass().getResourceAsStream("/converter/jsonToAvroRecord.json")), state).iterator().next();
+
+    Assert.assertEquals(record.get("longField").getAsLong(), 1234L);
+    Assert.assertEquals(record.get("nestedRecords").getAsJsonObject().get("nestedField2").getAsString(), "test2");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
new file mode 100644
index 0000000..65bfce6
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json
@@ -0,0 +1,13 @@
+{"nullableField": null,
+  "longField": 1234,
+  "arrayField": ["arr1", "arr2", "arr3"],
+  "mapField": {
+    "map1":"test1",
+    "map2":"test2",
+    "map3":"test3"
+  },
+  "nestedRecords": {
+    "nestedField": "test",
+    "nestedField2": "test2"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3f04c60d/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
new file mode 100644
index 0000000..22fec73
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc
@@ -0,0 +1,46 @@
+{
+  "name": "TestRecord",
+  "type": "record",
+  "namespace": "org.apache.gobblin.test",
+  "fields": [
+    {
+      "name": "nullableField",
+      "type": ["string", "null"]
+    },
+    {
+      "name": "longField",
+      "type": "long"
+    },
+    {
+      "name": "arrayField",
+      "type": {
+        "type": "array",
+        "items": "string"
+      }
+    },
+    {
+      "name": "mapField",
+      "type": {
+        "type": "map",
+        "values": "string"
+      }
+    },
+    {
+      "name": "nestedRecords",
+      "type": {
+        "type": "record",
+        "name": "nested",
+        "fields": [
+          {
+            "name": "nestedField",
+            "type": "string"
+          },
+          {
+            "name": "nestedField2",
+            "type": "string"
+          }
+        ]
+      }
+    }
+  ]
+}