You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/02 01:20:47 UTC

[7/8] samza git commit: SAMZA-582; add an avro to calcite schema converter

SAMZA-582; add an avro to calcite schema converter


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2239ba5f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2239ba5f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2239ba5f

Branch: refs/heads/samza-sql
Commit: 2239ba5f56b283336ee8655c0d8b59f6cde74458
Parents: 91c0542
Author: Milinda Lakmal Pathirage <mi...@gmail.com>
Authored: Sun Mar 1 16:11:01 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Sun Mar 1 16:11:01 2015 -0800

----------------------------------------------------------------------
 .../samza/sql/metadata/AvroSchemaConverter.java | 132 +++++++++++++++++++
 .../test/metadata/TestAvroSchemaConverter.java  |  57 ++++++++
 2 files changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2239ba5f/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
new file mode 100644
index 0000000..7cc43c0
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts an Avro schema to Calcite RelDataType.
+ * <p/>
+ * <p>Inspired by parquet-mr.</p>
+ */
+public class AvroSchemaConverter {
+
+  private final RelDataTypeFactory relDataTypeFactory;
+  private final Schema rootSchema;
+
+  public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
+    this.relDataTypeFactory = relDataTypeFactory;
+    this.rootSchema = schema;
+  }
+
+  public RelDataType convert() {
+    // At the top level only records are supported
+    if (rootSchema.getType() != Schema.Type.RECORD) {
+      throw new RuntimeException(
+          String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
+              rootSchema.getType()));
+    }
+
+    return convertRecord(rootSchema, true);
+  }
+
+  private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
+    RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
+
+    for (Schema.Field field : recordSchema.getFields()) {
+      Schema fieldSchema = field.schema();
+      if (fieldSchema.getType() == Schema.Type.NULL) {
+        continue;
+      }
+
+      convertField(builder, field.name(), fieldSchema);
+    }
+
+    RelDataType record = builder.build();
+    if(isRoot) {
+      // Record at root level is treated differently.
+      return record;
+    }
+
+    return relDataTypeFactory.createStructType(record.getFieldList());
+  }
+
+  private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
+                            String fieldName,
+                            Schema fieldSchema) {
+    builder.add(fieldName, convertFieldType(fieldSchema));
+  }
+
+  private RelDataType convertFieldType(Schema elementType) {
+    Schema.Type type = elementType.getType();
+    if (type == Schema.Type.STRING) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+    } else if (type == Schema.Type.INT) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+    } else if (type == Schema.Type.BOOLEAN) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+    } else if (type == Schema.Type.BYTES) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
+    } else if (type == Schema.Type.LONG) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+    } else if (type == Schema.Type.DOUBLE) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+    } else if (type == Schema.Type.FLOAT) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+    } else if (type == Schema.Type.ARRAY) {
+      return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
+    } else if (type == Schema.Type.RECORD) {
+      return convertRecord(elementType, false);
+    } else if (type == Schema.Type.MAP) {
+      return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+          convertFieldType(elementType.getValueType()));
+    } else if (type == Schema.Type.FIXED) {
+      return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
+    } else if (type == Schema.Type.UNION) {
+      List<Schema> types = elementType.getTypes();
+      List<Schema> nonNullTypes = new ArrayList<Schema>();
+      boolean foundNull = false;
+
+      for(Schema s : types) {
+        if(s.getType() == Schema.Type.NULL){
+          foundNull = true;
+        } else {
+          nonNullTypes.add(s);
+        }
+      }
+
+      if(nonNullTypes.size() > 1){
+        throw new RuntimeException("Multiple non null types in a union is not supported.");
+      } else {
+        return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
+      }
+    } else if(type == Schema.Type.ENUM) {
+      // TODO: May be there is a better way to handle enums
+      relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+    }
+
+    return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2239ba5f/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
new file mode 100644
index 0000000..b4ac5f5
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/test/metadata/TestAvroSchemaConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.test.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.sql.metadata.AvroSchemaConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAvroSchemaConverter {
+  public static final String SIMPLE_RECORD_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
+      " \"type\": \"record\",\n" +
+      " \"name\": \"User\",\n" +
+      " \"fields\": [\n" +
+      "     {\"name\": \"name\", \"type\": \"string\"},\n" +
+      "     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
+      "     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
+      " ]\n" +
+      "}";
+
+  public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
+  @Test
+  public void testSimpleAvroRecord(){
+    RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    AvroSchemaConverter schemaConverter = new AvroSchemaConverter(relDataTypeFactory, simpleRecord);
+
+    RelDataType relDataType = schemaConverter.convert();
+
+    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("name", false, false).getType().getSqlTypeName());
+    Assert.assertEquals(SqlTypeName.INTEGER, relDataType.getField("favorite_number", false, false).getType().getSqlTypeName());
+    Assert.assertTrue(relDataType.getField("favorite_number", false, false).getType().isNullable());
+    Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("favorite_color", false, false).getType().getSqlTypeName());
+    Assert.assertTrue(relDataType.getField("favorite_color", false, false).getType().isNullable());
+  }
+}