You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/11/22 23:39:54 UTC
[2/2] samza git commit: SAMZA-1050: Make samza-operator independent
of avro version
SAMZA-1050: Make samza-operator independent of avro version
Prep for merging the samza-operator APIs to master: removing the direct dependency on avro.
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Reviewers: Jagadish <ja...@gmail.com>, Prateek Maheshiwari <pm...@linkedin.com>
Closes #22 from nickpan47/SAMZA-1050
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7de7359
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7de7359
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7de7359
Branch: refs/heads/samza-sql
Commit: a7de73594b3b85b3ad4132cc33dff1f5efcd95bf
Parents: 1dac25e
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Nov 22 15:39:27 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 22 15:39:27 2016 -0800
----------------------------------------------------------------------
build.gradle | 18 +-
.../apache/samza/operators/api/data/Data.java | 57 ----
.../apache/samza/operators/api/data/Schema.java | 58 ----
.../operators/impl/data/avro/AvroData.java | 262 ----------------
.../operators/impl/data/avro/AvroSchema.java | 296 -------------------
.../impl/data/serializers/SqlAvroSerde.java | 108 -------
.../data/serializers/SqlAvroSerdeFactory.java | 40 ---
.../impl/data/serializers/SqlStringSerde.java | 44 ---
.../data/serializers/SqlStringSerdeFactory.java | 33 ---
.../operators/impl/data/string/StringData.java | 101 -------
.../impl/data/string/StringSchema.java | 73 -----
.../impl/data/serializers/SqlAvroSerdeTest.java | 103 -------
.../samza/task/BroadcastOperatorTask.java | 14 +-
.../org/apache/samza/sql/calcite/data/Data.java | 57 ++++
.../apache/samza/sql/calcite/data/Schema.java | 58 ++++
.../samza/sql/calcite/data/avro/AvroData.java | 262 ++++++++++++++++
.../samza/sql/calcite/data/avro/AvroSchema.java | 296 +++++++++++++++++++
.../calcite/data/serializers/SqlAvroSerde.java | 108 +++++++
.../data/serializers/SqlAvroSerdeFactory.java | 40 +++
.../data/serializers/SqlStringSerde.java | 44 +++
.../data/serializers/SqlStringSerdeFactory.java | 33 +++
.../sql/calcite/data/string/StringData.java | 101 +++++++
.../sql/calcite/data/string/StringSchema.java | 73 +++++
.../data/serializers/SqlAvroSerdeTest.java | 103 +++++++
24 files changed, 1194 insertions(+), 1188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 28c2dcf..1b3c278 100644
--- a/build.gradle
+++ b/build.gradle
@@ -278,7 +278,12 @@ project(":samza-yarn_$scalaVersion") {
// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
+
+ // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+ // tasks.compileTestJava.enabled = false
sourceSets.main.java.srcDirs = []
+ sourceSets.test.java.srcDirs = []
dependencies {
compile project(':samza-api')
@@ -304,6 +309,10 @@ project(":samza-yarn_$scalaVersion") {
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
+ compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+ exclude module: 'slf4j-log4j12'
+ exclude module: 'servlet-api'
+ }
compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
@@ -317,6 +326,7 @@ project(":samza-yarn_$scalaVersion") {
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
}
repositories {
@@ -346,7 +356,6 @@ if (JavaVersion.current().isJava8Compatible()) {
compile project(":samza-core_$scalaVersion")
compile "commons-collections:commons-collections:$commonsCollectionVersion"
compile "org.apache.commons:commons-lang3:$commonsLang3Version"
- compile "org.apache.avro:avro:$avroVersion"
compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
testCompile project(":samza-api").sourceSets.test.output
@@ -368,6 +377,7 @@ if (JavaVersion.current().isJava8Compatible()) {
dependencies {
compile project(":samza-operator")
+ compile "org.apache.avro:avro:$avroVersion"
compile "org.apache.calcite:calcite-core:$calciteVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
@@ -470,7 +480,13 @@ project(":samza-kv-inmemory_$scalaVersion") {
project(":samza-kv-rocksdb_$scalaVersion") {
apply plugin: 'scala'
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
sourceSets.test.scala.srcDir "src/test/java"
+
+ // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+ // tasks.compileTestJava.enabled = false
+ sourceSets.main.java.srcDirs = []
sourceSets.test.java.srcDirs = []
dependencies {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
deleted file mode 100644
index 69a3bee..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * A generic data interface that allows to implement data access / deserialization w/ {@link Schema}
- */
-public interface Data {
-
- Schema schema();
-
- Object value();
-
- int intValue();
-
- long longValue();
-
- float floatValue();
-
- double doubleValue();
-
- boolean booleanValue();
-
- String strValue();
-
- byte[] bytesValue();
-
- List<Object> arrayValue();
-
- Map<Object, Object> mapValue();
-
- Data getElement(int index);
-
- Data getFieldData(String fldName);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
deleted file mode 100644
index dc3f8f4..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-import java.util.Map;
-
-
-/**
- * This defines an interface for generic schema access methods
- */
-public interface Schema {
-
- enum Type {
- INTEGER,
- LONG,
- FLOAT,
- DOUBLE,
- BOOLEAN,
- STRING,
- BYTES,
- STRUCT,
- ARRAY,
- MAP
- };
-
- Type getType();
-
- Schema getElementType();
-
- Schema getValueType();
-
- Map<String, Schema> getFields();
-
- Schema getFieldType(String fldName);
-
- Data read(Object object);
-
- Data transform(Data inputData);
-
- boolean equals(Schema other);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
deleted file mode 100644
index e4f5d79..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.avro;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-
-public class AvroData implements Data {
- protected final Object datum;
- protected final AvroSchema schema;
-
- private AvroData(AvroSchema schema, Object datum) {
- this.datum = datum;
- this.schema = schema;
- }
-
- @Override
- public Schema schema() {
- return this.schema;
- }
-
- @Override
- public Object value() {
- return this.datum;
- }
-
- @Override
- public int intValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public long longValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public float floatValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public double doubleValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public boolean booleanValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public String strValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public byte[] bytesValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public List<Object> arrayValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public Map<Object, Object> mapValue() {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public Data getElement(int index) {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- @Override
- public Data getFieldData(String fldName) {
- throw new UnsupportedOperationException("Can't get value for an unknown data type.");
- }
-
- public static AvroData getArray(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.ARRAY) {
- throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
- }
- return new AvroData(schema, datum) {
- @SuppressWarnings("unchecked")
- private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
-
- @Override
- public List<Object> arrayValue() {
- return this.array;
- }
-
- @Override
- public Data getElement(int index) {
- return this.schema.getElementType().read(array.get(index));
- }
-
- };
- }
-
- public static AvroData getMap(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.MAP) {
- throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
- }
- return new AvroData(schema, datum) {
- @SuppressWarnings("unchecked")
- private final Map<Object, Object> map = (Map<Object, Object>) datum;
-
- @Override
- public Map<Object, Object> mapValue() {
- return this.map;
- }
-
- @Override
- public Data getFieldData(String fldName) {
- // TODO Auto-generated method stub
- return this.schema.getValueType().read(map.get(fldName));
- }
-
- };
- }
-
- public static AvroData getStruct(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.STRUCT) {
- throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
- }
- return new AvroData(schema, datum) {
- private final GenericRecord record = (GenericRecord) datum;
-
- @Override
- public Data getFieldData(String fldName) {
- // TODO Auto-generated method stub
- return this.schema.getFieldType(fldName).read(record.get(fldName));
- }
-
- };
- }
-
- public static AvroData getInt(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public int intValue() {
- return ((Integer) datum).intValue();
- }
-
- };
- }
-
- public static AvroData getLong(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public long longValue() {
- return ((Long) datum).longValue();
- }
-
- };
- }
-
- public static AvroData getFloat(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public float floatValue() {
- return ((Float) datum).floatValue();
- }
-
- };
- }
-
- public static AvroData getDouble(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public double doubleValue() {
- return ((Double) datum).doubleValue();
- }
-
- };
- }
-
- public static AvroData getBoolean(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public boolean booleanValue() {
- return ((Boolean) datum).booleanValue();
- }
-
- };
- }
-
- public static AvroData getString(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public String strValue() {
- return ((CharSequence) datum).toString();
- }
-
- };
- }
-
- public static AvroData getBytes(AvroSchema schema, Object datum) {
- if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
- throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
- + datum.getClass().getName());
- }
- return new AvroData(schema, datum) {
- @Override
- public byte[] bytesValue() {
- return ((ByteBuffer) datum).array();
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
deleted file mode 100644
index c04e4f6..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.avro;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.Schema.Field;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-
-public class AvroSchema implements Schema {
-
- protected final org.apache.avro.Schema avroSchema;
- protected final Schema.Type type;
-
- private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
- new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
-
- static {
- primSchemas.put(org.apache.avro.Schema.Type.INT,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getInt(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.LONG,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getLong(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getFloat(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getDouble(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getBoolean(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.STRING,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getString(this, datum);
- }
- });
- primSchemas.put(org.apache.avro.Schema.Type.BYTES,
- new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
- @Override
- public Data read(Object datum) {
- return AvroData.getBytes(this, datum);
- }
- });
- };
-
- public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
- Schema.Type type = mapType(schema.getType());
- if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
- return primSchemas.get(schema.getType());
- }
- // otherwise, construct the new schema
- // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
- switch (type) {
- case ARRAY:
- return new AvroSchema(schema) {
- @Override
- public Data transform(Data input) {
- // This would get all the elements until the length of the current schema's array length
- if (input.schema().getType() != Schema.Type.ARRAY) {
- throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
- + input.schema().getType());
- }
- if (!input.schema().getElementType().equals(this.getElementType())) {
- throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
- + input.schema().getElementType().getType());
- }
- // input type matches array type
- return AvroData.getArray(this, input.value());
- }
- };
- case MAP:
- return new AvroSchema(schema) {
- @Override
- public Data transform(Data input) {
- // This would get all the elements until the length of the current schema's array length
- if (input.schema().getType() != Schema.Type.MAP) {
- throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
- + input.schema().getType());
- }
- if (!input.schema().getValueType().equals(this.getValueType())) {
- throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
- + input.schema().getValueType().getType());
- }
- // input type matches map type
- return AvroData.getMap(this, input.value());
- }
- };
- case STRUCT:
- return new AvroSchema(schema) {
- @SuppressWarnings("serial")
- private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
- {
- for (Field field : schema.getFields()) {
- put(field.name(), getSchema(field.schema()));
- }
- }
- };
-
- @Override
- public Map<String, Schema> getFields() {
- return this.fldSchemas;
- }
-
- @Override
- public Schema getFieldType(String fldName) {
- return this.fldSchemas.get(fldName);
- }
-
- @Override
- public Data transform(Data input) {
- // This would get all the elements until the length of the current schema's array length
- if (input.schema().getType() != Schema.Type.STRUCT) {
- throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
- + input.schema().getType());
- }
- // Note: this particular transform function only implements "projection to a sub-set" concept.
- // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
- for (String fldName : this.fldSchemas.keySet()) {
- // check each field schema matches input
- Schema fldSchema = this.fldSchemas.get(fldName);
- Schema inputFld = input.schema().getFieldType(fldName);
- if (!fldSchema.equals(inputFld)) {
- throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
- + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
- }
- }
- // input type matches struct type
- return AvroData.getStruct(this, input.value());
- }
-
- };
- default:
- throw new IllegalArgumentException("Un-recognized complext data type:" + type);
- }
- }
-
- private AvroSchema(org.apache.avro.Schema schema) {
- this.avroSchema = schema;
- this.type = mapType(schema.getType());
- }
-
- private static Type mapType(org.apache.avro.Schema.Type type) {
- switch (type) {
- case ARRAY:
- return Schema.Type.ARRAY;
- case RECORD:
- return Schema.Type.STRUCT;
- case MAP:
- return Schema.Type.MAP;
- case INT:
- return Schema.Type.INTEGER;
- case LONG:
- return Schema.Type.LONG;
- case BOOLEAN:
- return Schema.Type.BOOLEAN;
- case FLOAT:
- return Schema.Type.FLOAT;
- case DOUBLE:
- return Schema.Type.DOUBLE;
- case STRING:
- return Schema.Type.STRING;
- case BYTES:
- return Schema.Type.BYTES;
- default:
- throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
- }
- }
-
- @Override
- public Type getType() {
- return this.type;
- }
-
- @Override
- public Schema getElementType() {
- if (this.type != Schema.Type.ARRAY) {
- throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
- }
- return getSchema(this.avroSchema.getElementType());
- }
-
- @Override
- public Schema getValueType() {
- if (this.type != Schema.Type.MAP) {
- throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
- }
- return getSchema(this.avroSchema.getValueType());
- }
-
- @Override
- public Map<String, Schema> getFields() {
- throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
- }
-
- @Override
- public Schema getFieldType(String fldName) {
- throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
- }
-
- @Override
- public Data read(Object object) {
- if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
- return AvroData.getArray(this, object);
- } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
- return AvroData.getMap(this, object);
- } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
- return AvroData.getStruct(this, object);
- }
- throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
- }
-
- @Override
- public Data transform(Data inputData) {
- if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
- || inputData.schema().getType() == Schema.Type.STRUCT) {
- throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
- }
- if (inputData.schema().getType() != this.type) {
- throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
- + ", input type:" + inputData.schema().getType());
- }
- return inputData;
- }
-
- @Override
- public boolean equals(Schema other) {
- // TODO Auto-generated method stub
- if (this.type != other.getType()) {
- return false;
- }
- switch (this.type) {
- case ARRAY:
- // check if element types are the same
- return this.getElementType().equals(other.getElementType());
- case MAP:
- // check if value types are the same
- return this.getValueType().equals(other.getValueType());
- case STRUCT:
- // check if the fields schemas in this equals the other
- // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
- for (String fieldName : this.getFields().keySet()) {
- if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
- return false;
- }
- }
- return true;
- default:
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
deleted file mode 100644
index 2432aca..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.samza.SamzaException;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.apache.samza.operators.impl.data.avro.AvroSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class SqlAvroSerde implements Serde<AvroData> {
- private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
-
- private final Schema avroSchema;
- private final GenericDatumReader<GenericRecord> reader;
- private final GenericDatumWriter<Object> writer;
-
- public SqlAvroSerde(Schema avroSchema) {
- this.avroSchema = avroSchema;
- this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
- this.writer = new GenericDatumWriter<Object>(avroSchema);
- }
-
- @Override
- public AvroData fromBytes(byte[] bytes) {
- GenericRecord data;
-
- try {
- data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null));
- return getAvroData(data, avroSchema);
- } catch (IOException e) {
- String errMsg = "Cannot decode message.";
- log.error(errMsg, e);
- throw new SamzaException(errMsg, e);
- }
- }
-
- @Override
- public byte[] toBytes(AvroData object) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Encoder encoder = new BinaryEncoder(out);
-
- try {
- writer.write(object.value(), encoder);
- encoder.flush();
- return out.toByteArray();
- } catch (IOException e) {
- String errMsg = "Cannot perform Avro binary encode.";
- log.error(errMsg, e);
- throw new SamzaException(errMsg, e);
- }
- }
-
- private AvroData getAvroData(GenericRecord data, Schema type){
- AvroSchema schema = AvroSchema.getSchema(type);
- switch (type.getType()){
- case RECORD:
- return AvroData.getStruct(schema, data);
- case ARRAY:
- return AvroData.getArray(schema, data);
- case MAP:
- return AvroData.getMap(schema, data);
- case INT:
- return AvroData.getInt(schema, data);
- case LONG:
- return AvroData.getLong(schema, data);
- case BOOLEAN:
- return AvroData.getBoolean(schema, data);
- case FLOAT:
- return AvroData.getFloat(schema, data);
- case DOUBLE:
- return AvroData.getDouble(schema, data);
- case STRING:
- return AvroData.getString(schema, data);
- case BYTES:
- return AvroData.getBytes(schema, data);
- default:
- throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
deleted file mode 100644
index edd8859..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-
-public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
- public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
-
- @Override
- public Serde<AvroData> getSerde(String name, Config config) {
- String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
- if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
- throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
- }
-
- return new SqlAvroSerde(Schema.parse(avroSchemaStr));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
deleted file mode 100644
index 1267ab6..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.operators.impl.data.string.StringData;
-
-
-public class SqlStringSerde implements Serde<StringData> {
-
- private final Serde<String> serde;
-
- public SqlStringSerde(String encoding) {
- this.serde = new StringSerde(encoding);
- }
-
- @Override
- public StringData fromBytes(byte[] bytes) {
- return new StringData(serde.fromBytes(bytes));
- }
-
- @Override
- public byte[] toBytes(StringData object) {
- return serde.toBytes(object.strValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
deleted file mode 100644
index 3b6a3e0..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.serializers;
-
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.string.StringData;
-
-public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
- @Override
- public Serde<StringData> getSerde(String name, Config config) {
- return new SqlStringSerde(config.get("encoding", "UTF-8"));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
deleted file mode 100644
index 86e9917..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.string;
-
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-import java.util.List;
-import java.util.Map;
-
-public class StringData implements Data {
- private final Object datum;
- private final Schema schema;
-
- public StringData(Object datum) {
- this.datum = datum;
- this.schema = new StringSchema();
- }
-
- @Override
- public Schema schema() {
- return this.schema;
- }
-
- @Override
- public Object value() {
- return this.datum;
- }
-
- @Override
- public int intValue() {
- throw new UnsupportedOperationException("Can't get int value for a string type data");
- }
-
- @Override
- public long longValue() {
- throw new UnsupportedOperationException("Can't get long value for a string type data");
- }
-
- @Override
- public float floatValue() {
- throw new UnsupportedOperationException("Can't get float value for a string type data");
- }
-
- @Override
- public double doubleValue() {
- throw new UnsupportedOperationException("Can't get double value for a string type data");
- }
-
- @Override
- public boolean booleanValue() {
- throw new UnsupportedOperationException("Can't get boolean value for a string type data");
- }
-
- @Override
- public String strValue() {
- return String.valueOf(datum);
- }
-
- @Override
- public byte[] bytesValue() {
- throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
- }
-
- @Override
- public List<Object> arrayValue() {
- throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
- }
-
- @Override
- public Map<Object, Object> mapValue() {
- throw new UnsupportedOperationException("Can't get mapValue for a string type data");
- }
-
- @Override
- public Data getElement(int index) {
- throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
- }
-
- @Override
- public Data getFieldData(String fldName) {
- throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
deleted file mode 100644
index b19dfeb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.string;
-
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-import java.util.Map;
-
-public class StringSchema implements Schema {
- private Type type = Type.STRING;
-
- @Override
- public Type getType() {
- return Type.STRING;
- }
-
- @Override
- public Schema getElementType() {
- throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
- }
-
- @Override
- public Schema getValueType() {
- throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
- }
-
- @Override
- public Map<String, Schema> getFields() {
- throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
- }
-
- @Override
- public Schema getFieldType(String fldName) {
- throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
- }
-
- @Override
- public Data read(Object object) {
- return new StringData(object);
- }
-
- @Override
- public Data transform(Data inputData) {
- if (inputData.schema().getType() != this.type) {
- throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
- + ", input type:" + inputData.schema().getType());
- }
- return inputData;
- }
-
- @Override
- public boolean equals(Schema other) {
- return other.getType() == this.type;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
deleted file mode 100644
index 5aa28bb..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumWriter;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SqlAvroSerdeTest {
- public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.operators\",\n"+
- " \"type\": \"record\",\n"+
- " \"name\": \"Order\",\n"+
- " \"fields\": [\n"+
- " {\"name\": \"id\", \"type\": \"int\"},\n"+
- " {\"name\": \"product\", \"type\": \"string\"},\n"+
- " {\"name\": \"quantity\", \"type\": \"int\"}\n"+
- " ]\n"+
- "}";
-
- public static Schema orderSchema = Schema.parse(ORDER_SCHEMA);
-
- private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig());
-
- @Test
- public void testSqlAvroSerdeDeserialization() throws IOException {
- AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
-
- Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
- Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
- }
-
- @Test
- public void testSqlAvroSerialization() throws IOException {
- AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
- @SuppressWarnings("unchecked")
- byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
-
- AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
-
- Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
- Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
- }
-
- private static Config sqlAvroSerdeTestConfig(){
- Map<String, String> config = new HashMap<String, String>();
- config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
-
- return new MapConfig(config);
- }
-
- private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException {
- DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- BinaryEncoder encoder = new BinaryEncoder(output);
- writer.write(datum, encoder);
- encoder.flush();
-
- return output.toByteArray();
- }
-
- private static GenericRecord sampleOrderRecord(){
- GenericData.Record datum = new GenericData.Record(orderSchema);
- datum.put("id", 1);
- datum.put("product", "paint");
- datum.put("quantity", 3);
-
- return datum;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 493a688..a6d57da 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -19,11 +19,10 @@
package org.apache.samza.task;
-import org.apache.avro.generic.GenericRecord;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.Windows;
import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
import org.apache.samza.operators.data.IncomingSystemMessage;
import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.task.StreamOperatorTask;
@@ -82,16 +81,7 @@ public class BroadcastOperatorTask implements StreamOperatorTask {
}
JsonMessage getInputMessage(IncomingSystemMessage m1) {
- return new JsonMessage(
- m1.getKey().toString(),
- (MessageType) m1.getMessage(),
- m1.getOffset(),
- this.getEventTime((GenericRecord)m1.getMessage()),
- m1.getSystemStreamPartition());
- }
-
- long getEventTime(GenericRecord msg) {
- return (Long) msg.get("event_time");
+ return (JsonMessage) m1.getMessage();
}
boolean myFilter1(JsonMessage m1) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
new file mode 100644
index 0000000..7d6ee79
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A generic data interface that allows to implement data access / deserialization w/ {@link Schema}
+ */
+public interface Data {
+
+ Schema schema();
+
+ Object value();
+
+ int intValue();
+
+ long longValue();
+
+ float floatValue();
+
+ double doubleValue();
+
+ boolean booleanValue();
+
+ String strValue();
+
+ byte[] bytesValue();
+
+ List<Object> arrayValue();
+
+ Map<Object, Object> mapValue();
+
+ Data getElement(int index);
+
+ Data getFieldData(String fldName);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
new file mode 100644
index 0000000..e2a79cf
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data;
+
+import java.util.Map;
+
+
+/**
+ * This defines an interface for generic schema access methods
+ */
+public interface Schema {
+
+ enum Type {
+ INTEGER,
+ LONG,
+ FLOAT,
+ DOUBLE,
+ BOOLEAN,
+ STRING,
+ BYTES,
+ STRUCT,
+ ARRAY,
+ MAP
+ };
+
+ Type getType();
+
+ Schema getElementType();
+
+ Schema getValueType();
+
+ Map<String, Schema> getFields();
+
+ Schema getFieldType(String fldName);
+
+ Data read(Object object);
+
+ Data transform(Data inputData);
+
+ boolean equals(Schema other);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
new file mode 100644
index 0000000..91d26a2
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+
+public class AvroData implements Data {
+ protected final Object datum;
+ protected final AvroSchema schema;
+
+ private AvroData(AvroSchema schema, Object datum) {
+ this.datum = datum;
+ this.schema = schema;
+ }
+
+ @Override
+ public Schema schema() {
+ return this.schema;
+ }
+
+ @Override
+ public Object value() {
+ return this.datum;
+ }
+
+ @Override
+ public int intValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public long longValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public float floatValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public double doubleValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public boolean booleanValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public String strValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public byte[] bytesValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public List<Object> arrayValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Map<Object, Object> mapValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Data getElement(int index) {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Data getFieldData(String fldName) {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ public static AvroData getArray(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.ARRAY) {
+ throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ @SuppressWarnings("unchecked")
+ private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
+
+ @Override
+ public List<Object> arrayValue() {
+ return this.array;
+ }
+
+ @Override
+ public Data getElement(int index) {
+ return this.schema.getElementType().read(array.get(index));
+ }
+
+ };
+ }
+
+ public static AvroData getMap(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.MAP) {
+ throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ @SuppressWarnings("unchecked")
+ private final Map<Object, Object> map = (Map<Object, Object>) datum;
+
+ @Override
+ public Map<Object, Object> mapValue() {
+ return this.map;
+ }
+
+ @Override
+ public Data getFieldData(String fldName) {
+ // TODO Auto-generated method stub
+ return this.schema.getValueType().read(map.get(fldName));
+ }
+
+ };
+ }
+
+ public static AvroData getStruct(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ private final GenericRecord record = (GenericRecord) datum;
+
+ @Override
+ public Data getFieldData(String fldName) {
+ // TODO Auto-generated method stub
+ return this.schema.getFieldType(fldName).read(record.get(fldName));
+ }
+
+ };
+ }
+
+ public static AvroData getInt(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public int intValue() {
+ return ((Integer) datum).intValue();
+ }
+
+ };
+ }
+
+ public static AvroData getLong(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public long longValue() {
+ return ((Long) datum).longValue();
+ }
+
+ };
+ }
+
+ public static AvroData getFloat(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public float floatValue() {
+ return ((Float) datum).floatValue();
+ }
+
+ };
+ }
+
+ public static AvroData getDouble(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public double doubleValue() {
+ return ((Double) datum).doubleValue();
+ }
+
+ };
+ }
+
+ public static AvroData getBoolean(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public boolean booleanValue() {
+ return ((Boolean) datum).booleanValue();
+ }
+
+ };
+ }
+
+ public static AvroData getString(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public String strValue() {
+ return ((CharSequence) datum).toString();
+ }
+
+ };
+ }
+
+ public static AvroData getBytes(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public byte[] bytesValue() {
+ return ((ByteBuffer) datum).array();
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
new file mode 100644
index 0000000..c3bb150
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+
+public class AvroSchema implements Schema {
+
+ protected final org.apache.avro.Schema avroSchema;
+ protected final Schema.Type type;
+
+ private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
+ new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
+
+ static {
+ primSchemas.put(org.apache.avro.Schema.Type.INT,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getInt(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.LONG,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getLong(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getFloat(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getDouble(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getBoolean(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.STRING,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getString(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.BYTES,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getBytes(this, datum);
+ }
+ });
+ };
+
+ public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
+ Schema.Type type = mapType(schema.getType());
+ if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
+ return primSchemas.get(schema.getType());
+ }
+ // otherwise, construct the new schema
+ // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
+ switch (type) {
+ case ARRAY:
+ return new AvroSchema(schema) {
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.ARRAY) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ if (!input.schema().getElementType().equals(this.getElementType())) {
+ throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getElementType().getType());
+ }
+ // input type matches array type
+ return AvroData.getArray(this, input.value());
+ }
+ };
+ case MAP:
+ return new AvroSchema(schema) {
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.MAP) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ if (!input.schema().getValueType().equals(this.getValueType())) {
+ throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getValueType().getType());
+ }
+ // input type matches map type
+ return AvroData.getMap(this, input.value());
+ }
+ };
+ case STRUCT:
+ return new AvroSchema(schema) {
+ @SuppressWarnings("serial")
+ private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
+ {
+ for (Field field : schema.getFields()) {
+ put(field.name(), getSchema(field.schema()));
+ }
+ }
+ };
+
+ @Override
+ public Map<String, Schema> getFields() {
+ return this.fldSchemas;
+ }
+
+ @Override
+ public Schema getFieldType(String fldName) {
+ return this.fldSchemas.get(fldName);
+ }
+
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ // Note: this particular transform function only implements "projection to a sub-set" concept.
+ // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
+ for (String fldName : this.fldSchemas.keySet()) {
+ // check each field schema matches input
+ Schema fldSchema = this.fldSchemas.get(fldName);
+ Schema inputFld = input.schema().getFieldType(fldName);
+ if (!fldSchema.equals(inputFld)) {
+ throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
+ + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
+ }
+ }
+ // input type matches struct type
+ return AvroData.getStruct(this, input.value());
+ }
+
+ };
+ default:
+ throw new IllegalArgumentException("Un-recognized complext data type:" + type);
+ }
+ }
+
+ private AvroSchema(org.apache.avro.Schema schema) {
+ this.avroSchema = schema;
+ this.type = mapType(schema.getType());
+ }
+
+ private static Type mapType(org.apache.avro.Schema.Type type) {
+ switch (type) {
+ case ARRAY:
+ return Schema.Type.ARRAY;
+ case RECORD:
+ return Schema.Type.STRUCT;
+ case MAP:
+ return Schema.Type.MAP;
+ case INT:
+ return Schema.Type.INTEGER;
+ case LONG:
+ return Schema.Type.LONG;
+ case BOOLEAN:
+ return Schema.Type.BOOLEAN;
+ case FLOAT:
+ return Schema.Type.FLOAT;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ case STRING:
+ return Schema.Type.STRING;
+ case BYTES:
+ return Schema.Type.BYTES;
+ default:
+ throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+ }
+ }
+
+ @Override
+ public Type getType() {
+ return this.type;
+ }
+
+ @Override
+ public Schema getElementType() {
+ if (this.type != Schema.Type.ARRAY) {
+ throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+ }
+ return getSchema(this.avroSchema.getElementType());
+ }
+
+ @Override
+ public Schema getValueType() {
+ if (this.type != Schema.Type.MAP) {
+ throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+ }
+ return getSchema(this.avroSchema.getValueType());
+ }
+
+ @Override
+ public Map<String, Schema> getFields() {
+ throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+ }
+
+ @Override
+ public Schema getFieldType(String fldName) {
+ throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+ }
+
+ @Override
+ public Data read(Object object) {
+ if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
+ return AvroData.getArray(this, object);
+ } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
+ return AvroData.getMap(this, object);
+ } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
+ return AvroData.getStruct(this, object);
+ }
+ throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
+ }
+
+ @Override
+ public Data transform(Data inputData) {
+ if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
+ || inputData.schema().getType() == Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
+ }
+ if (inputData.schema().getType() != this.type) {
+ throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+ + ", input type:" + inputData.schema().getType());
+ }
+ return inputData;
+ }
+
+ @Override
+ public boolean equals(Schema other) {
+ // TODO Auto-generated method stub
+ if (this.type != other.getType()) {
+ return false;
+ }
+ switch (this.type) {
+ case ARRAY:
+ // check if element types are the same
+ return this.getElementType().equals(other.getElementType());
+ case MAP:
+ // check if value types are the same
+ return this.getValueType().equals(other.getValueType());
+ case STRUCT:
+ // check if the fields schemas in this equals the other
+ // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
+ for (String fieldName : this.getFields().keySet()) {
+ if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
+ return false;
+ }
+ }
+ return true;
+ default:
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
new file mode 100644
index 0000000..97a3b6c
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class SqlAvroSerde implements Serde<AvroData> {
+ private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
+
+ private final Schema avroSchema;
+ private final GenericDatumReader<GenericRecord> reader;
+ private final GenericDatumWriter<Object> writer;
+
+ public SqlAvroSerde(Schema avroSchema) {
+ this.avroSchema = avroSchema;
+ this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
+ this.writer = new GenericDatumWriter<Object>(avroSchema);
+ }
+
+ @Override
+ public AvroData fromBytes(byte[] bytes) {
+ GenericRecord data;
+
+ try {
+ data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null));
+ return getAvroData(data, avroSchema);
+ } catch (IOException e) {
+ String errMsg = "Cannot decode message.";
+ log.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ @Override
+ public byte[] toBytes(AvroData object) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = new BinaryEncoder(out);
+
+ try {
+ writer.write(object.value(), encoder);
+ encoder.flush();
+ return out.toByteArray();
+ } catch (IOException e) {
+ String errMsg = "Cannot perform Avro binary encode.";
+ log.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ private AvroData getAvroData(GenericRecord data, Schema type){
+ AvroSchema schema = AvroSchema.getSchema(type);
+ switch (type.getType()){
+ case RECORD:
+ return AvroData.getStruct(schema, data);
+ case ARRAY:
+ return AvroData.getArray(schema, data);
+ case MAP:
+ return AvroData.getMap(schema, data);
+ case INT:
+ return AvroData.getInt(schema, data);
+ case LONG:
+ return AvroData.getLong(schema, data);
+ case BOOLEAN:
+ return AvroData.getBoolean(schema, data);
+ case FLOAT:
+ return AvroData.getFloat(schema, data);
+ case DOUBLE:
+ return AvroData.getDouble(schema, data);
+ case STRING:
+ return AvroData.getString(schema, data);
+ case BYTES:
+ return AvroData.getBytes(schema, data);
+ default:
+ throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
new file mode 100644
index 0000000..caf4009
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+
+public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
+ public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
+
+ @Override
+ public Serde<AvroData> getSerde(String name, Config config) {
+ String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
+ if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
+ throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
+ }
+
+ return new SqlAvroSerde(Schema.parse(avroSchemaStr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
new file mode 100644
index 0000000..6651e97
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.calcite.data.string.StringData;
+
+
+public class SqlStringSerde implements Serde<StringData> {
+
+ private final Serde<String> serde;
+
+ public SqlStringSerde(String encoding) {
+ this.serde = new StringSerde(encoding);
+ }
+
+ @Override
+ public StringData fromBytes(byte[] bytes) {
+ return new StringData(serde.fromBytes(bytes));
+ }
+
+ @Override
+ public byte[] toBytes(StringData object) {
+ return serde.toBytes(object.strValue());
+ }
+}