You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/11/22 23:39:54 UTC

[2/2] samza git commit: SAMZA-1050: Make samza-operator independent of avro version

SAMZA-1050: Make samza-operator independent of avro version

Prep for merging the samza-operator APIs to master: removing the direct dependency on avro.

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Reviewers: Jagadish <ja...@gmail.com>, Prateek Maheshiwari <pm...@linkedin.com>

Closes #22 from nickpan47/SAMZA-1050


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7de7359
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7de7359
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7de7359

Branch: refs/heads/samza-sql
Commit: a7de73594b3b85b3ad4132cc33dff1f5efcd95bf
Parents: 1dac25e
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Nov 22 15:39:27 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 22 15:39:27 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |  18 +-
 .../apache/samza/operators/api/data/Data.java   |  57 ----
 .../apache/samza/operators/api/data/Schema.java |  58 ----
 .../operators/impl/data/avro/AvroData.java      | 262 ----------------
 .../operators/impl/data/avro/AvroSchema.java    | 296 -------------------
 .../impl/data/serializers/SqlAvroSerde.java     | 108 -------
 .../data/serializers/SqlAvroSerdeFactory.java   |  40 ---
 .../impl/data/serializers/SqlStringSerde.java   |  44 ---
 .../data/serializers/SqlStringSerdeFactory.java |  33 ---
 .../operators/impl/data/string/StringData.java  | 101 -------
 .../impl/data/string/StringSchema.java          |  73 -----
 .../impl/data/serializers/SqlAvroSerdeTest.java | 103 -------
 .../samza/task/BroadcastOperatorTask.java       |  14 +-
 .../org/apache/samza/sql/calcite/data/Data.java |  57 ++++
 .../apache/samza/sql/calcite/data/Schema.java   |  58 ++++
 .../samza/sql/calcite/data/avro/AvroData.java   | 262 ++++++++++++++++
 .../samza/sql/calcite/data/avro/AvroSchema.java | 296 +++++++++++++++++++
 .../calcite/data/serializers/SqlAvroSerde.java  | 108 +++++++
 .../data/serializers/SqlAvroSerdeFactory.java   |  40 +++
 .../data/serializers/SqlStringSerde.java        |  44 +++
 .../data/serializers/SqlStringSerdeFactory.java |  33 +++
 .../sql/calcite/data/string/StringData.java     | 101 +++++++
 .../sql/calcite/data/string/StringSchema.java   |  73 +++++
 .../data/serializers/SqlAvroSerdeTest.java      | 103 +++++++
 24 files changed, 1194 insertions(+), 1188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 28c2dcf..1b3c278 100644
--- a/build.gradle
+++ b/build.gradle
@@ -278,7 +278,12 @@ project(":samza-yarn_$scalaVersion") {
 
   // Force scala joint compilation
   sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
   sourceSets.main.java.srcDirs = []
+  sourceSets.test.java.srcDirs = []
 
   dependencies {
     compile project(':samza-api')
@@ -304,6 +309,10 @@ project(":samza-yarn_$scalaVersion") {
       // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
       exclude module: 'zookeeper'
     }
+    compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+    }
     compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
       exclude module: 'scala-compiler'
       exclude module: 'slf4j-api'
@@ -317,6 +326,7 @@ project(":samza-yarn_$scalaVersion") {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
 
   repositories {
@@ -346,7 +356,6 @@ if (JavaVersion.current().isJava8Compatible()) {
       compile project(":samza-core_$scalaVersion")
       compile "commons-collections:commons-collections:$commonsCollectionVersion"
       compile "org.apache.commons:commons-lang3:$commonsLang3Version"
-      compile "org.apache.avro:avro:$avroVersion"
       compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
 
       testCompile project(":samza-api").sourceSets.test.output
@@ -368,6 +377,7 @@ if (JavaVersion.current().isJava8Compatible()) {
 
     dependencies {
       compile project(":samza-operator")
+      compile "org.apache.avro:avro:$avroVersion"
       compile "org.apache.calcite:calcite-core:$calciteVersion"
       testCompile "junit:junit:$junitVersion"
       testCompile "org.mockito:mockito-all:$mockitoVersion"
@@ -470,7 +480,13 @@ project(":samza-kv-inmemory_$scalaVersion") {
 project(":samza-kv-rocksdb_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
   sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
+  sourceSets.main.java.srcDirs = []
   sourceSets.test.java.srcDirs = []
 
   dependencies {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
deleted file mode 100644
index 69a3bee..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * A generic data interface that allows to implement data access / deserialization w/ {@link Schema}
- */
-public interface Data {
-
-  Schema schema();
-
-  Object value();
-
-  int intValue();
-
-  long longValue();
-
-  float floatValue();
-
-  double doubleValue();
-
-  boolean booleanValue();
-
-  String strValue();
-
-  byte[] bytesValue();
-
-  List<Object> arrayValue();
-
-  Map<Object, Object> mapValue();
-
-  Data getElement(int index);
-
-  Data getFieldData(String fldName);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
deleted file mode 100644
index dc3f8f4..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-import java.util.Map;
-
-
-/**
- * This defines an interface for generic schema access methods
- */
-public interface Schema {
-
-  enum Type {
-    INTEGER,
-    LONG,
-    FLOAT,
-    DOUBLE,
-    BOOLEAN,
-    STRING,
-    BYTES,
-    STRUCT,
-    ARRAY,
-    MAP
-  };
-
-  Type getType();
-
-  Schema getElementType();
-
-  Schema getValueType();
-
-  Map<String, Schema> getFields();
-
-  Schema getFieldType(String fldName);
-
-  Data read(Object object);
-
-  Data transform(Data inputData);
-
-  boolean equals(Schema other);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
deleted file mode 100644
index e4f5d79..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.avro;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-
-public class AvroData implements Data {
-  protected final Object datum;
-  protected final AvroSchema schema;
-
-  private AvroData(AvroSchema schema, Object datum) {
-    this.datum = datum;
-    this.schema = schema;
-  }
-
-  @Override
-  public Schema schema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object value() {
-    return this.datum;
-  }
-
-  @Override
-  public int intValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public long longValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public float floatValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public double doubleValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public boolean booleanValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public String strValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public byte[] bytesValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public List<Object> arrayValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Map<Object, Object> mapValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getElement(int index) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getFieldData(String fldName) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  public static AvroData getArray(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.ARRAY) {
-      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
-
-      @Override
-      public List<Object> arrayValue() {
-        return this.array;
-      }
-
-      @Override
-      public Data getElement(int index) {
-        return this.schema.getElementType().read(array.get(index));
-      }
-
-    };
-  }
-
-  public static AvroData getMap(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.MAP) {
-      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final Map<Object, Object> map = (Map<Object, Object>) datum;
-
-      @Override
-      public Map<Object, Object> mapValue() {
-        return this.map;
-      }
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getValueType().read(map.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getStruct(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      private final GenericRecord record = (GenericRecord) datum;
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getFieldType(fldName).read(record.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getInt(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public int intValue() {
-        return ((Integer) datum).intValue();
-      }
-
-    };
-  }
-
-  public static AvroData getLong(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public long longValue() {
-        return ((Long) datum).longValue();
-      }
-
-    };
-  }
-
-  public static AvroData getFloat(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public float floatValue() {
-        return ((Float) datum).floatValue();
-      }
-
-    };
-  }
-
-  public static AvroData getDouble(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public double doubleValue() {
-        return ((Double) datum).doubleValue();
-      }
-
-    };
-  }
-
-  public static AvroData getBoolean(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public boolean booleanValue() {
-        return ((Boolean) datum).booleanValue();
-      }
-
-    };
-  }
-
-  public static AvroData getString(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public String strValue() {
-        return ((CharSequence) datum).toString();
-      }
-
-    };
-  }
-
-  public static AvroData getBytes(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public byte[] bytesValue() {
-        return ((ByteBuffer) datum).array();
-      }
-
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
deleted file mode 100644
index c04e4f6..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.avro;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.Schema.Field;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-
-public class AvroSchema implements Schema {
-
-  protected final org.apache.avro.Schema avroSchema;
-  protected final Schema.Type type;
-
-  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
-      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
-
-  static {
-    primSchemas.put(org.apache.avro.Schema.Type.INT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getInt(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.LONG,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getLong(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getFloat(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getDouble(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBoolean(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.STRING,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getString(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBytes(this, datum);
-      }
-    });
-  };
-
-  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
-    Schema.Type type = mapType(schema.getType());
-    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
-      return primSchemas.get(schema.getType());
-    }
-    // otherwise, construct the new schema
-    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
-    switch (type) {
-      case ARRAY:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.ARRAY) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getElementType().equals(this.getElementType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getElementType().getType());
-            }
-            // input type matches array type
-            return AvroData.getArray(this, input.value());
-          }
-        };
-      case MAP:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.MAP) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getValueType().equals(this.getValueType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getValueType().getType());
-            }
-            // input type matches map type
-            return AvroData.getMap(this, input.value());
-          }
-        };
-      case STRUCT:
-        return new AvroSchema(schema) {
-          @SuppressWarnings("serial")
-          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
-            {
-              for (Field field : schema.getFields()) {
-                put(field.name(), getSchema(field.schema()));
-              }
-            }
-          };
-
-          @Override
-          public Map<String, Schema> getFields() {
-            return this.fldSchemas;
-          }
-
-          @Override
-          public Schema getFieldType(String fldName) {
-            return this.fldSchemas.get(fldName);
-          }
-
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.STRUCT) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            // Note: this particular transform function only implements "projection to a sub-set" concept.
-            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
-            for (String fldName : this.fldSchemas.keySet()) {
-              // check each field schema matches input
-              Schema fldSchema = this.fldSchemas.get(fldName);
-              Schema inputFld = input.schema().getFieldType(fldName);
-              if (!fldSchema.equals(inputFld)) {
-                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
-                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
-              }
-            }
-            // input type matches struct type
-            return AvroData.getStruct(this, input.value());
-          }
-
-        };
-      default:
-        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
-    }
-  }
-
-  private AvroSchema(org.apache.avro.Schema schema) {
-    this.avroSchema = schema;
-    this.type = mapType(schema.getType());
-  }
-
-  private static Type mapType(org.apache.avro.Schema.Type type) {
-    switch (type) {
-      case ARRAY:
-        return Schema.Type.ARRAY;
-      case RECORD:
-        return Schema.Type.STRUCT;
-      case MAP:
-        return Schema.Type.MAP;
-      case INT:
-        return Schema.Type.INTEGER;
-      case LONG:
-        return Schema.Type.LONG;
-      case BOOLEAN:
-        return Schema.Type.BOOLEAN;
-      case FLOAT:
-        return Schema.Type.FLOAT;
-      case DOUBLE:
-        return Schema.Type.DOUBLE;
-      case STRING:
-        return Schema.Type.STRING;
-      case BYTES:
-        return Schema.Type.BYTES;
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
-    }
-  }
-
-  @Override
-  public Type getType() {
-    return this.type;
-  }
-
-  @Override
-  public Schema getElementType() {
-    if (this.type != Schema.Type.ARRAY) {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getElementType());
-  }
-
-  @Override
-  public Schema getValueType() {
-    if (this.type != Schema.Type.MAP) {
-      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getValueType());
-  }
-
-  @Override
-  public Map<String, Schema> getFields() {
-    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-  }
-
-  @Override
-  public Schema getFieldType(String fldName) {
-    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-  }
-
-  @Override
-  public Data read(Object object) {
-    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
-      return AvroData.getArray(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
-      return AvroData.getMap(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
-      return AvroData.getStruct(this, object);
-    }
-    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
-  }
-
-  @Override
-  public Data transform(Data inputData) {
-    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
-        || inputData.schema().getType() == Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
-    }
-    if (inputData.schema().getType() != this.type) {
-      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-          + ", input type:" + inputData.schema().getType());
-    }
-    return inputData;
-  }
-
-  @Override
-  public boolean equals(Schema other) {
-    // TODO Auto-generated method stub
-    if (this.type != other.getType()) {
-      return false;
-    }
-    switch (this.type) {
-      case ARRAY:
-        // check if element types are the same
-        return this.getElementType().equals(other.getElementType());
-      case MAP:
-        // check if value types are the same
-        return this.getValueType().equals(other.getValueType());
-      case STRUCT:
-        // check if the fields schemas in this equals the other
-        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
-        for (String fieldName : this.getFields().keySet()) {
-          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
-            return false;
-          }
-        }
-        return true;
-      default:
-        return true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
deleted file mode 100644
index 2432aca..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.samza.SamzaException;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.apache.samza.operators.impl.data.avro.AvroSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class SqlAvroSerde implements Serde<AvroData> {
-  private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
-
-  private final Schema avroSchema;
-  private final GenericDatumReader<GenericRecord> reader;
-  private final GenericDatumWriter<Object> writer;
-
-  public SqlAvroSerde(Schema avroSchema) {
-    this.avroSchema = avroSchema;
-    this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
-    this.writer = new GenericDatumWriter<Object>(avroSchema);
-  }
-
-  @Override
-  public AvroData fromBytes(byte[] bytes) {
-    GenericRecord data;
-
-    try {
-      data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null));
-      return getAvroData(data, avroSchema);
-    } catch (IOException e) {
-      String errMsg = "Cannot decode message.";
-      log.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  @Override
-  public byte[] toBytes(AvroData object) {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder encoder = new BinaryEncoder(out);
-
-    try {
-      writer.write(object.value(), encoder);
-      encoder.flush();
-      return out.toByteArray();
-    } catch (IOException e) {
-      String errMsg = "Cannot perform Avro binary encode.";
-      log.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  private AvroData getAvroData(GenericRecord data, Schema type){
-    AvroSchema schema = AvroSchema.getSchema(type);
-    switch (type.getType()){
-      case RECORD:
-        return AvroData.getStruct(schema, data);
-      case ARRAY:
-        return AvroData.getArray(schema, data);
-      case MAP:
-        return AvroData.getMap(schema, data);
-      case INT:
-        return AvroData.getInt(schema, data);
-      case LONG:
-        return AvroData.getLong(schema, data);
-      case BOOLEAN:
-        return AvroData.getBoolean(schema, data);
-      case FLOAT:
-        return AvroData.getFloat(schema, data);
-      case DOUBLE:
-        return AvroData.getDouble(schema, data);
-      case STRING:
-        return AvroData.getString(schema, data);
-      case BYTES:
-        return AvroData.getBytes(schema, data);
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
deleted file mode 100644
index edd8859..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-
-public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
-  public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
-
-  @Override
-  public Serde<AvroData> getSerde(String name, Config config) {
-    String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
-    if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
-      throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
-    }
-
-    return new SqlAvroSerde(Schema.parse(avroSchemaStr));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
deleted file mode 100644
index 1267ab6..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.operators.impl.data.string.StringData;
-
-
-public class SqlStringSerde implements Serde<StringData> {
-
-    private final Serde<String> serde;
-
-    public SqlStringSerde(String encoding) {
-        this.serde = new StringSerde(encoding);
-    }
-
-    @Override
-    public StringData fromBytes(byte[] bytes) {
-          return new StringData(serde.fromBytes(bytes));
-    }
-
-    @Override
-    public byte[] toBytes(StringData object) {
-        return serde.toBytes(object.strValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
deleted file mode 100644
index 3b6a3e0..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.serializers;
-
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.string.StringData;
-
-public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
-    @Override
-    public Serde<StringData> getSerde(String name, Config config) {
-        return new SqlStringSerde(config.get("encoding", "UTF-8"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
deleted file mode 100644
index 86e9917..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.string;
-
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-import java.util.List;
-import java.util.Map;
-
-public class StringData implements Data {
-    private final Object datum;
-    private final Schema schema;
-
-    public StringData(Object datum) {
-        this.datum = datum;
-        this.schema = new StringSchema();
-    }
-
-    @Override
-    public Schema schema() {
-        return this.schema;
-    }
-
-    @Override
-    public Object value() {
-        return this.datum;
-    }
-
-    @Override
-    public int intValue() {
-        throw new UnsupportedOperationException("Can't get int value for a string type data");
-    }
-
-    @Override
-    public long longValue() {
-        throw new UnsupportedOperationException("Can't get long value for a string type data");
-    }
-
-    @Override
-    public float floatValue() {
-        throw new UnsupportedOperationException("Can't get float value for a string type data");
-    }
-
-    @Override
-    public double doubleValue() {
-        throw new UnsupportedOperationException("Can't get double value for a string type data");
-    }
-
-    @Override
-    public boolean booleanValue() {
-        throw new UnsupportedOperationException("Can't get boolean value for a string type data");
-    }
-
-    @Override
-    public String strValue() {
-        return String.valueOf(datum);
-    }
-
-    @Override
-    public byte[] bytesValue() {
-        throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
-    }
-
-    @Override
-    public List<Object> arrayValue() {
-        throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
-    }
-
-    @Override
-    public Map<Object, Object> mapValue() {
-        throw new UnsupportedOperationException("Can't get mapValue for a string type data");
-    }
-
-    @Override
-    public Data getElement(int index) {
-        throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
-    }
-
-    @Override
-    public Data getFieldData(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
deleted file mode 100644
index b19dfeb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl.data.string;
-
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
-
-import java.util.Map;
-
-public class StringSchema implements Schema {
-    private Type type = Type.STRING;
-
-    @Override
-    public Type getType() {
-      return Type.STRING;
-    }
-
-    @Override
-    public Schema getElementType() {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-
-    @Override
-    public Schema getValueType() {
-        throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-
-    @Override
-    public Map<String, Schema> getFields() {
-        throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-    }
-
-    @Override
-    public Schema getFieldType(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-    }
-
-    @Override
-    public Data read(Object object) {
-        return new StringData(object);
-    }
-
-    @Override
-    public Data transform(Data inputData) {
-        if (inputData.schema().getType() != this.type) {
-            throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-                    + ", input type:" + inputData.schema().getType());
-        }
-        return inputData;
-    }
-
-    @Override
-    public boolean equals(Schema other) {
-        return other.getType() == this.type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
deleted file mode 100644
index 5aa28bb..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumWriter;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SqlAvroSerdeTest {
-  public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.operators\",\n"+
-      " \"type\": \"record\",\n"+
-      " \"name\": \"Order\",\n"+
-      " \"fields\": [\n"+
-      "     {\"name\": \"id\", \"type\": \"int\"},\n"+
-      "     {\"name\": \"product\",  \"type\": \"string\"},\n"+
-      "     {\"name\": \"quantity\", \"type\": \"int\"}\n"+
-      " ]\n"+
-      "}";
-
-  public static Schema orderSchema = Schema.parse(ORDER_SCHEMA);
-
-  private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig());
-
-  @Test
-  public void testSqlAvroSerdeDeserialization() throws IOException {
-    AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
-
-    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
-  }
-
-  @Test
-  public void testSqlAvroSerialization() throws IOException {
-    AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
-    @SuppressWarnings("unchecked")
-    byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
-
-    AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
-
-    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
-  }
-
-  private static Config sqlAvroSerdeTestConfig(){
-    Map<String, String> config = new HashMap<String, String>();
-    config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
-
-    return new MapConfig(config);
-  }
-
-  private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException {
-    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    BinaryEncoder encoder = new BinaryEncoder(output);
-    writer.write(datum, encoder);
-    encoder.flush();
-
-    return  output.toByteArray();
-  }
-
-  private static GenericRecord sampleOrderRecord(){
-    GenericData.Record datum = new GenericData.Record(orderSchema);
-    datum.put("id", 1);
-    datum.put("product", "paint");
-    datum.put("quantity", 3);
-
-    return datum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 493a688..a6d57da 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -19,11 +19,10 @@
 
 package org.apache.samza.task;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.Windows;
 import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
 import org.apache.samza.operators.data.IncomingSystemMessage;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.task.StreamOperatorTask;
@@ -82,16 +81,7 @@ public class BroadcastOperatorTask implements StreamOperatorTask {
   }
 
   JsonMessage getInputMessage(IncomingSystemMessage m1) {
-    return new JsonMessage(
-        m1.getKey().toString(),
-        (MessageType) m1.getMessage(),
-        m1.getOffset(),
-        this.getEventTime((GenericRecord)m1.getMessage()),
-        m1.getSystemStreamPartition());
-  }
-
-  long getEventTime(GenericRecord msg) {
-    return (Long) msg.get("event_time");
+    return (JsonMessage) m1.getMessage();
   }
 
   boolean myFilter1(JsonMessage m1) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
new file mode 100644
index 0000000..7d6ee79
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A generic data interface that allows to implement data access / deserialization w/ {@link Schema}
+ */
+public interface Data {
+
+  Schema schema();
+
+  Object value();
+
+  int intValue();
+
+  long longValue();
+
+  float floatValue();
+
+  double doubleValue();
+
+  boolean booleanValue();
+
+  String strValue();
+
+  byte[] bytesValue();
+
+  List<Object> arrayValue();
+
+  Map<Object, Object> mapValue();
+
+  Data getElement(int index);
+
+  Data getFieldData(String fldName);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
new file mode 100644
index 0000000..e2a79cf
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data;
+
+import java.util.Map;
+
+
+/**
+ * This defines an interface for generic schema access methods
+ */
+public interface Schema {
+
+  enum Type {
+    INTEGER,
+    LONG,
+    FLOAT,
+    DOUBLE,
+    BOOLEAN,
+    STRING,
+    BYTES,
+    STRUCT,
+    ARRAY,
+    MAP
+  };
+
+  Type getType();
+
+  Schema getElementType();
+
+  Schema getValueType();
+
+  Map<String, Schema> getFields();
+
+  Schema getFieldType(String fldName);
+
+  Data read(Object object);
+
+  Data transform(Data inputData);
+
+  boolean equals(Schema other);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
new file mode 100644
index 0000000..91d26a2
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+
+public class AvroData implements Data {
+  protected final Object datum;
+  protected final AvroSchema schema;
+
+  private AvroData(AvroSchema schema, Object datum) {
+    this.datum = datum;
+    this.schema = schema;
+  }
+
+  @Override
+  public Schema schema() {
+    return this.schema;
+  }
+
+  @Override
+  public Object value() {
+    return this.datum;
+  }
+
+  @Override
+  public int intValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public long longValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public float floatValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public double doubleValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public boolean booleanValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public String strValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public byte[] bytesValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public List<Object> arrayValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Map<Object, Object> mapValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getElement(int index) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getFieldData(String fldName) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  public static AvroData getArray(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.ARRAY) {
+      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
+
+      @Override
+      public List<Object> arrayValue() {
+        return this.array;
+      }
+
+      @Override
+      public Data getElement(int index) {
+        return this.schema.getElementType().read(array.get(index));
+      }
+
+    };
+  }
+
+  public static AvroData getMap(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.MAP) {
+      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final Map<Object, Object> map = (Map<Object, Object>) datum;
+
+      @Override
+      public Map<Object, Object> mapValue() {
+        return this.map;
+      }
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getValueType().read(map.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getStruct(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      private final GenericRecord record = (GenericRecord) datum;
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getFieldType(fldName).read(record.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getInt(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public int intValue() {
+        return ((Integer) datum).intValue();
+      }
+
+    };
+  }
+
+  public static AvroData getLong(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public long longValue() {
+        return ((Long) datum).longValue();
+      }
+
+    };
+  }
+
+  public static AvroData getFloat(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public float floatValue() {
+        return ((Float) datum).floatValue();
+      }
+
+    };
+  }
+
+  public static AvroData getDouble(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public double doubleValue() {
+        return ((Double) datum).doubleValue();
+      }
+
+    };
+  }
+
+  public static AvroData getBoolean(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public boolean booleanValue() {
+        return ((Boolean) datum).booleanValue();
+      }
+
+    };
+  }
+
+  public static AvroData getString(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public String strValue() {
+        return ((CharSequence) datum).toString();
+      }
+
+    };
+  }
+
+  public static AvroData getBytes(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public byte[] bytesValue() {
+        return ((ByteBuffer) datum).array();
+      }
+
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
new file mode 100644
index 0000000..c3bb150
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+
+public class AvroSchema implements Schema {
+
+  protected final org.apache.avro.Schema avroSchema;
+  protected final Schema.Type type;
+
+  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
+      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
+
+  static {
+    primSchemas.put(org.apache.avro.Schema.Type.INT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getInt(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.LONG,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getLong(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getFloat(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getDouble(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBoolean(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.STRING,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getString(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBytes(this, datum);
+      }
+    });
+  };
+
+  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
+    Schema.Type type = mapType(schema.getType());
+    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
+      return primSchemas.get(schema.getType());
+    }
+    // otherwise, construct the new schema
+    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
+    switch (type) {
+      case ARRAY:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.ARRAY) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getElementType().equals(this.getElementType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getElementType().getType());
+            }
+            // input type matches array type
+            return AvroData.getArray(this, input.value());
+          }
+        };
+      case MAP:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.MAP) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getValueType().equals(this.getValueType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getValueType().getType());
+            }
+            // input type matches map type
+            return AvroData.getMap(this, input.value());
+          }
+        };
+      case STRUCT:
+        return new AvroSchema(schema) {
+          @SuppressWarnings("serial")
+          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
+            {
+              for (Field field : schema.getFields()) {
+                put(field.name(), getSchema(field.schema()));
+              }
+            }
+          };
+
+          @Override
+          public Map<String, Schema> getFields() {
+            return this.fldSchemas;
+          }
+
+          @Override
+          public Schema getFieldType(String fldName) {
+            return this.fldSchemas.get(fldName);
+          }
+
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.STRUCT) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            // Note: this particular transform function only implements "projection to a sub-set" concept.
+            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
+            for (String fldName : this.fldSchemas.keySet()) {
+              // check each field schema matches input
+              Schema fldSchema = this.fldSchemas.get(fldName);
+              Schema inputFld = input.schema().getFieldType(fldName);
+              if (!fldSchema.equals(inputFld)) {
+                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
+                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
+              }
+            }
+            // input type matches struct type
+            return AvroData.getStruct(this, input.value());
+          }
+
+        };
+      default:
+        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
+    }
+  }
+
+  private AvroSchema(org.apache.avro.Schema schema) {
+    this.avroSchema = schema;
+    this.type = mapType(schema.getType());
+  }
+
+  private static Type mapType(org.apache.avro.Schema.Type type) {
+    switch (type) {
+      case ARRAY:
+        return Schema.Type.ARRAY;
+      case RECORD:
+        return Schema.Type.STRUCT;
+      case MAP:
+        return Schema.Type.MAP;
+      case INT:
+        return Schema.Type.INTEGER;
+      case LONG:
+        return Schema.Type.LONG;
+      case BOOLEAN:
+        return Schema.Type.BOOLEAN;
+      case FLOAT:
+        return Schema.Type.FLOAT;
+      case DOUBLE:
+        return Schema.Type.DOUBLE;
+      case STRING:
+        return Schema.Type.STRING;
+      case BYTES:
+        return Schema.Type.BYTES;
+      default:
+        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+    }
+  }
+
+  @Override
+  public Type getType() {
+    return this.type;
+  }
+
+  @Override
+  public Schema getElementType() {
+    if (this.type != Schema.Type.ARRAY) {
+      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getElementType());
+  }
+
+  @Override
+  public Schema getValueType() {
+    if (this.type != Schema.Type.MAP) {
+      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getValueType());
+  }
+
+  @Override
+  public Map<String, Schema> getFields() {
+    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+  }
+
+  @Override
+  public Schema getFieldType(String fldName) {
+    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+  }
+
+  @Override
+  public Data read(Object object) {
+    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
+      return AvroData.getArray(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
+      return AvroData.getMap(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
+      return AvroData.getStruct(this, object);
+    }
+    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
+  }
+
+  @Override
+  public Data transform(Data inputData) {
+    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
+        || inputData.schema().getType() == Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
+    }
+    if (inputData.schema().getType() != this.type) {
+      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+          + ", input type:" + inputData.schema().getType());
+    }
+    return inputData;
+  }
+
+  @Override
+  public boolean equals(Schema other) {
+    // TODO Auto-generated method stub
+    if (this.type != other.getType()) {
+      return false;
+    }
+    switch (this.type) {
+      case ARRAY:
+        // check if element types are the same
+        return this.getElementType().equals(other.getElementType());
+      case MAP:
+        // check if value types are the same
+        return this.getValueType().equals(other.getValueType());
+      case STRUCT:
+        // check if the fields schemas in this equals the other
+        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
+        for (String fieldName : this.getFields().keySet()) {
+          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
+            return false;
+          }
+        }
+        return true;
+      default:
+        return true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
new file mode 100644
index 0000000..97a3b6c
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class SqlAvroSerde implements Serde<AvroData> {
+  private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
+
+  private final Schema avroSchema;
+  private final GenericDatumReader<GenericRecord> reader;
+  private final GenericDatumWriter<Object> writer;
+
+  public SqlAvroSerde(Schema avroSchema) {
+    this.avroSchema = avroSchema;
+    this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
+    this.writer = new GenericDatumWriter<Object>(avroSchema);
+  }
+
+  @Override
+  public AvroData fromBytes(byte[] bytes) {
+    GenericRecord data;
+
+    try {
+      data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null));
+      return getAvroData(data, avroSchema);
+    } catch (IOException e) {
+      String errMsg = "Cannot decode message.";
+      log.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  @Override
+  public byte[] toBytes(AvroData object) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = new BinaryEncoder(out);
+
+    try {
+      writer.write(object.value(), encoder);
+      encoder.flush();
+      return out.toByteArray();
+    } catch (IOException e) {
+      String errMsg = "Cannot perform Avro binary encode.";
+      log.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  private AvroData getAvroData(GenericRecord data, Schema type){
+    AvroSchema schema = AvroSchema.getSchema(type);
+    switch (type.getType()){
+      case RECORD:
+        return AvroData.getStruct(schema, data);
+      case ARRAY:
+        return AvroData.getArray(schema, data);
+      case MAP:
+        return AvroData.getMap(schema, data);
+      case INT:
+        return AvroData.getInt(schema, data);
+      case LONG:
+        return AvroData.getLong(schema, data);
+      case BOOLEAN:
+        return AvroData.getBoolean(schema, data);
+      case FLOAT:
+        return AvroData.getFloat(schema, data);
+      case DOUBLE:
+        return AvroData.getDouble(schema, data);
+      case STRING:
+        return AvroData.getString(schema, data);
+      case BYTES:
+        return AvroData.getBytes(schema, data);
+      default:
+        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
new file mode 100644
index 0000000..caf4009
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+
+public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
+  public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
+
+  @Override
+  public Serde<AvroData> getSerde(String name, Config config) {
+    String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
+    if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
+      throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
+    }
+
+    return new SqlAvroSerde(Schema.parse(avroSchemaStr));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
new file mode 100644
index 0000000..6651e97
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.calcite.data.string.StringData;
+
+
+public class SqlStringSerde implements Serde<StringData> {
+
+    private final Serde<String> serde;
+
+    public SqlStringSerde(String encoding) {
+        this.serde = new StringSerde(encoding);
+    }
+
+    @Override
+    public StringData fromBytes(byte[] bytes) {
+          return new StringData(serde.fromBytes(bytes));
+    }
+
+    @Override
+    public byte[] toBytes(StringData object) {
+        return serde.toBytes(object.strValue());
+    }
+}