You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/02 01:20:47 UTC
[7/8] samza git commit: SAMZA-582;
add an avro to calcite schema converter
SAMZA-582; add an avro to calcite schema converter
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2239ba5f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2239ba5f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2239ba5f
Branch: refs/heads/samza-sql
Commit: 2239ba5f56b283336ee8655c0d8b59f6cde74458
Parents: 91c0542
Author: Milinda Lakmal Pathirage <mi...@gmail.com>
Authored: Sun Mar 1 16:11:01 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Sun Mar 1 16:11:01 2015 -0800
----------------------------------------------------------------------
.../samza/sql/metadata/AvroSchemaConverter.java | 132 +++++++++++++++++++
.../test/metadata/TestAvroSchemaConverter.java | 57 ++++++++
2 files changed, 189 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2239ba5f/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
new file mode 100644
index 0000000..7cc43c0
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts an Avro schema to Calcite RelDataType.
+ * <p/>
+ * <p>Inspired by parquet-mr.</p>
+ */
+public class AvroSchemaConverter {
+
+ private final RelDataTypeFactory relDataTypeFactory;
+ private final Schema rootSchema;
+
+ public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
+ this.relDataTypeFactory = relDataTypeFactory;
+ this.rootSchema = schema;
+ }
+
+ public RelDataType convert() {
+ // At the top level only records are supported
+ if (rootSchema.getType() != Schema.Type.RECORD) {
+ throw new RuntimeException(
+ String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
+ rootSchema.getType()));
+ }
+
+ return convertRecord(rootSchema, true);
+ }
+
+ private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
+ RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
+
+ for (Schema.Field field : recordSchema.getFields()) {
+ Schema fieldSchema = field.schema();
+ if (fieldSchema.getType() == Schema.Type.NULL) {
+ continue;
+ }
+
+ convertField(builder, field.name(), fieldSchema);
+ }
+
+ RelDataType record = builder.build();
+ if(isRoot) {
+ // Record at root level is treated differently.
+ return record;
+ }
+
+ return relDataTypeFactory.createStructType(record.getFieldList());
+ }
+
+ private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
+ String fieldName,
+ Schema fieldSchema) {
+ builder.add(fieldName, convertFieldType(fieldSchema));
+ }
+
+ private RelDataType convertFieldType(Schema elementType) {
+ Schema.Type type = elementType.getType();
+ if (type == Schema.Type.STRING) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+ } else if (type == Schema.Type.INT) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+ } else if (type == Schema.Type.BOOLEAN) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ } else if (type == Schema.Type.BYTES) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
+ } else if (type == Schema.Type.LONG) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+ } else if (type == Schema.Type.DOUBLE) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+ } else if (type == Schema.Type.FLOAT) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+ } else if (type == Schema.Type.ARRAY) {
+ return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
+ } else if (type == Schema.Type.RECORD) {
+ return convertRecord(elementType, false);
+ } else if (type == Schema.Type.MAP) {
+ return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+ convertFieldType(elementType.getValueType()));
+ } else if (type == Schema.Type.FIXED) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
+ } else if (type == Schema.Type.UNION) {
+ List<Schema> types = elementType.getTypes();
+ List<Schema> nonNullTypes = new ArrayList<Schema>();
+ boolean foundNull = false;
+
+ for(Schema s : types) {
+ if(s.getType() == Schema.Type.NULL){
+ foundNull = true;
+ } else {
+ nonNullTypes.add(s);
+ }
+ }
+
+ if(nonNullTypes.size() > 1){
+ throw new RuntimeException("Multiple non null types in a union is not supported.");
+ } else {
+ return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
+ }
+ } else if(type == Schema.Type.ENUM) {
+ // TODO: May be there is a better way to handle enums
+ relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+ }
+
+ return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2239ba5f/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
new file mode 100644
index 0000000..b4ac5f5
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.test.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.sql.metadata.AvroSchemaConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAvroSchemaConverter {
+ public static final String SIMPLE_RECORD_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"name\": \"User\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\": \"name\", \"type\": \"string\"},\n" +
+ " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
+ " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
+ " ]\n" +
+ "}";
+
+ public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
+ @Test
+ public void testSimpleAvroRecord(){
+ RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ AvroSchemaConverter schemaConverter = new AvroSchemaConverter(relDataTypeFactory, simpleRecord);
+
+ RelDataType relDataType = schemaConverter.convert();
+
+ Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("name", false, false).getType().getSqlTypeName());
+ Assert.assertEquals(SqlTypeName.INTEGER, relDataType.getField("favorite_number", false, false).getType().getSqlTypeName());
+ Assert.assertTrue(relDataType.getField("favorite_number", false, false).getType().isNullable());
+ Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("favorite_color", false, false).getType().getSqlTypeName());
+ Assert.assertTrue(relDataType.getField("favorite_color", false, false).getType().isNullable());
+ }
+}