You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/11 18:57:31 UTC

samza git commit: SAMZA-484; define serialization for tuples in samza-sql

Repository: samza
Updated Branches:
  refs/heads/master efb9795a2 -> eedf2e720


SAMZA-484; define serialization for tuples in samza-sql


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/eedf2e72
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/eedf2e72
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/eedf2e72

Branch: refs/heads/master
Commit: eedf2e7204fc01e32bd21c454ff83a36a23f6105
Parents: efb9795
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed Feb 11 09:56:32 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Feb 11 09:56:32 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  10 +-
 gradle/dependency-versions.gradle               |   1 +
 samza-sql/README                                |   1 -
 samza-sql/README.md                             |   1 +
 .../org/apache/samza/sql/api/data/Data.java     |  54 ++++
 .../org/apache/samza/sql/api/data/Schema.java   |  55 ++++
 .../org/apache/samza/sql/api/data/Tuple.java    |   4 +-
 .../samza/sql/data/IncomingMessageTuple.java    |   9 +-
 .../apache/samza/sql/data/avro/AvroData.java    | 262 ++++++++++++++++
 .../apache/samza/sql/data/avro/AvroSchema.java  | 296 +++++++++++++++++++
 .../sql/data/serializers/SqlStringSerde.java    |  45 +++
 .../data/serializers/SqlStringSerdeFactory.java |  33 +++
 .../samza/sql/data/string/StringData.java       | 101 +++++++
 .../samza/sql/data/string/StringSchema.java     |  73 +++++
 .../sql/operators/partition/PartitionOp.java    |   5 +-
 15 files changed, 932 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e6b10fc..b49c313 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ rat {
     'gradlew',
     'gradlew.bat',
     'samza-test/state/mystore/**',
-    'README.md',
+    '**/README.md',
     'RELEASE.md',
   ]
 }
@@ -249,18 +249,12 @@ project(":samza-yarn_$scalaVersion") {
 project(":samza-sql_$scalaVersion") {
   apply plugin: 'java'
 
-  configurations {
-    // Remove transitive dependencies from Zookeeper that we don't want.
-    compile.exclude group: 'javax.jms', module: 'jms'
-    compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
-    compile.exclude group: 'com.sun.jmx', module: 'jmxri'
-  }
-
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile project(":samza-kv_$scalaVersion")
     compile "commons-collections:commons-collections:$commonsCollectionVersion"
+    compile "org.apache.avro:avro:$avroVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 1e76901..2bb24ad 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -35,4 +35,5 @@
   guavaVersion = "17.0"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
+  avroVersion = "1.7.7"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
deleted file mode 100644
index 65b7558..0000000
--- a/samza-sql/README
+++ /dev/null
@@ -1 +0,0 @@
-samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/README.md
----------------------------------------------------------------------
diff --git a/samza-sql/README.md b/samza-sql/README.md
new file mode 100644
index 0000000..598670b
--- /dev/null
+++ b/samza-sql/README.md
@@ -0,0 +1 @@
+samza-sql is an experimental module that is under development (SAMZA-390).

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
new file mode 100644
index 0000000..d1b8409
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.List;
+import java.util.Map;
+
+
+public interface Data {
+
+  Schema schema();
+
+  Object value();
+
+  int intValue();
+
+  long longValue();
+
+  float floatValue();
+
+  double doubleValue();
+
+  boolean booleanValue();
+
+  String strValue();
+
+  byte[] bytesValue();
+
+  List<Object> arrayValue();
+
+  Map<Object, Object> mapValue();
+
+  Data getElement(int index);
+
+  Data getFieldData(String fldName);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
new file mode 100644
index 0000000..1e8f192
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.Map;
+
+
+public interface Schema {
+
+  enum Type {
+    INTEGER,
+    LONG,
+    FLOAT,
+    DOUBLE,
+    BOOLEAN,
+    STRING,
+    BYTES,
+    STRUCT,
+    ARRAY,
+    MAP
+  };
+
+  Type getType();
+
+  Schema getElementType();
+
+  Schema getValueType();
+
+  Map<String, Schema> getFields();
+
+  Schema getFieldType(String fldName);
+
+  Data read(Object object);
+
+  Data transform(Data inputData);
+
+  boolean equals(Schema other);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index 0c21a53..bc8efcf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -32,7 +32,7 @@ public interface Tuple {
    *
    * @return Message object in the tuple
    */
-  Object getMessage();
+  Data getMessage();
 
   /**
    * Method to indicate whether the tuple is a delete tuple or an insert tuple
@@ -46,7 +46,7 @@ public interface Tuple {
    *
    * @return The <code>key</code> of the tuple
    */
-  Object getKey();
+  Data getKey();
 
   /**
    * Get the stream name of the tuple. Note this stream name should be unique in the system.

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index a8a55e2..f868e5c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.sql.data;
 
+import org.apache.samza.sql.api.data.Data;
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -52,8 +53,8 @@ public class IncomingMessageTuple implements Tuple {
 
   // TODO: the return type should be changed to the generic data type
   @Override
-  public Object getMessage() {
-    return this.imsg.getMessage();
+  public Data getMessage() {
+    return (Data) this.imsg.getMessage();
   }
 
   @Override
@@ -62,8 +63,8 @@ public class IncomingMessageTuple implements Tuple {
   }
 
   @Override
-  public Object getKey() {
-    return imsg.getKey();
+  public Data getKey() {
+    return (Data) this.imsg.getKey();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
new file mode 100644
index 0000000..d040be9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+
+public class AvroData implements Data {
+  protected final Object datum;
+  protected final AvroSchema schema;
+
+  private AvroData(AvroSchema schema, Object datum) {
+    this.datum = datum;
+    this.schema = schema;
+  }
+
+  @Override
+  public Schema schema() {
+    return this.schema;
+  }
+
+  @Override
+  public Object value() {
+    return this.datum;
+  }
+
+  @Override
+  public int intValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public long longValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public float floatValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public double doubleValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public boolean booleanValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public String strValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public byte[] bytesValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public List<Object> arrayValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Map<Object, Object> mapValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getElement(int index) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getFieldData(String fldName) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  public static AvroData getArray(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.ARRAY) {
+      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
+
+      @Override
+      public List<Object> arrayValue() {
+        return this.array;
+      }
+
+      @Override
+      public Data getElement(int index) {
+        return this.schema.getElementType().read(array.get(index));
+      }
+
+    };
+  }
+
+  public static AvroData getMap(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.MAP) {
+      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final Map<Object, Object> map = (Map<Object, Object>) datum;
+
+      @Override
+      public Map<Object, Object> mapValue() {
+        return this.map;
+      }
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getValueType().read(map.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getStruct(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      private final GenericRecord record = (GenericRecord) datum;
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getFieldType(fldName).read(record.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getInt(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public int intValue() {
+        return ((Integer) datum).intValue();
+      }
+
+    };
+  }
+
+  public static AvroData getLong(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public long longValue() {
+        return ((Long) datum).longValue();
+      }
+
+    };
+  }
+
+  public static AvroData getFloat(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public float floatValue() {
+        return ((Float) datum).floatValue();
+      }
+
+    };
+  }
+
+  public static AvroData getDouble(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public double doubleValue() {
+        return ((Double) datum).doubleValue();
+      }
+
+    };
+  }
+
+  public static AvroData getBoolean(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public boolean booleanValue() {
+        return ((Boolean) datum).booleanValue();
+      }
+
+    };
+  }
+
+  public static AvroData getString(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public String strValue() {
+        return ((CharSequence) datum).toString();
+      }
+
+    };
+  }
+
+  public static AvroData getBytes(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public byte[] bytesValue() {
+        return ((ByteBuffer) datum).array();
+      }
+
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
new file mode 100644
index 0000000..577cf74
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+
+public class AvroSchema implements Schema {
+
+  protected final org.apache.avro.Schema avroSchema;
+  protected final Schema.Type type;
+
+  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
+      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
+
+  static {
+    primSchemas.put(org.apache.avro.Schema.Type.INT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getInt(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.LONG,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getLong(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getFloat(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getDouble(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBoolean(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.STRING,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getString(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBytes(this, datum);
+      }
+    });
+  };
+
+  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
+    Schema.Type type = mapType(schema.getType());
+    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
+      return primSchemas.get(schema.getType());
+    }
+    // otherwise, construct the new schema
+    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
+    switch (type) {
+      case ARRAY:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.ARRAY) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getElementType().equals(this.getElementType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getElementType().getType());
+            }
+            // input type matches array type
+            return AvroData.getArray(this, input.value());
+          }
+        };
+      case MAP:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.MAP) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getValueType().equals(this.getValueType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getValueType().getType());
+            }
+            // input type matches map type
+            return AvroData.getMap(this, input.value());
+          }
+        };
+      case STRUCT:
+        return new AvroSchema(schema) {
+          @SuppressWarnings("serial")
+          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
+            {
+              for (Field field : schema.getFields()) {
+                put(field.name(), getSchema(field.schema()));
+              }
+            }
+          };
+
+          @Override
+          public Map<String, Schema> getFields() {
+            return this.fldSchemas;
+          }
+
+          @Override
+          public Schema getFieldType(String fldName) {
+            return this.fldSchemas.get(fldName);
+          }
+
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.STRUCT) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            // Note: this particular transform function only implements "projection to a sub-set" concept.
+            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
+            for (String fldName : this.fldSchemas.keySet()) {
+              // check each field schema matches input
+              Schema fldSchema = this.fldSchemas.get(fldName);
+              Schema inputFld = input.schema().getFieldType(fldName);
+              if (!fldSchema.equals(inputFld)) {
+                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
+                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
+              }
+            }
+            // input type matches struct type
+            return AvroData.getStruct(this, input.value());
+          }
+
+        };
+      default:
+        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
+    }
+  }
+
+  private AvroSchema(org.apache.avro.Schema schema) {
+    this.avroSchema = schema;
+    this.type = mapType(schema.getType());
+  }
+
+  private static Type mapType(org.apache.avro.Schema.Type type) {
+    switch (type) {
+      case ARRAY:
+        return Schema.Type.ARRAY;
+      case RECORD:
+        return Schema.Type.STRUCT;
+      case MAP:
+        return Schema.Type.MAP;
+      case INT:
+        return Schema.Type.INTEGER;
+      case LONG:
+        return Schema.Type.LONG;
+      case BOOLEAN:
+        return Schema.Type.BOOLEAN;
+      case FLOAT:
+        return Schema.Type.FLOAT;
+      case DOUBLE:
+        return Schema.Type.DOUBLE;
+      case STRING:
+        return Schema.Type.STRING;
+      case BYTES:
+        return Schema.Type.BYTES;
+      default:
+        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+    }
+  }
+
+  @Override
+  public Type getType() {
+    return this.type;
+  }
+
+  @Override
+  public Schema getElementType() {
+    if (this.type != Schema.Type.ARRAY) {
+      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getElementType());
+  }
+
+  @Override
+  public Schema getValueType() {
+    if (this.type != Schema.Type.MAP) {
+      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getValueType());
+  }
+
+  @Override
+  public Map<String, Schema> getFields() {
+    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+  }
+
+  @Override
+  public Schema getFieldType(String fldName) {
+    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+  }
+
+  @Override
+  public Data read(Object object) {
+    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
+      return AvroData.getArray(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
+      return AvroData.getMap(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
+      return AvroData.getStruct(this, object);
+    }
+    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
+  }
+
+  @Override
+  public Data transform(Data inputData) {
+    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
+        || inputData.schema().getType() == Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
+    }
+    if (inputData.schema().getType() != this.type) {
+      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+          + ", input type:" + inputData.schema().getType());
+    }
+    return inputData;
+  }
+
+  @Override
+  public boolean equals(Schema other) {
+    // TODO Auto-generated method stub
+    if (this.type != other.getType()) {
+      return false;
+    }
+    switch (this.type) {
+      case ARRAY:
+        // check if element types are the same
+        return this.getElementType().equals(other.getElementType());
+      case MAP:
+        // check if value types are the same
+        return this.getValueType().equals(other.getValueType());
+      case STRUCT:
+        // check if the fields schemas in this equals the other
+        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
+        for (String fieldName : this.getFields().keySet()) {
+          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
+            return false;
+          }
+        }
+        return true;
+      default:
+        return true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
new file mode 100644
index 0000000..1f0c3b2
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.serializers;
+
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.data.string.StringData;
+
+import java.io.UnsupportedEncodingException;
+
+public class SqlStringSerde implements Serde<StringData> {
+
+    private final Serde<String> serde;
+
+    public SqlStringSerde(String encoding) {
+        this.serde = new StringSerde(encoding);
+    }
+
+    @Override
+    public StringData fromBytes(byte[] bytes) {
+          return new StringData(serde.fromBytes(bytes));
+    }
+
+    @Override
+    public byte[] toBytes(StringData object) {
+        return serde.toBytes(object.strValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
new file mode 100644
index 0000000..2564479
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.serializers;
+
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.data.string.StringData;
+
+public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
+    @Override
+    public Serde<StringData> getSerde(String name, Config config) {
+        return new SqlStringSerde(config.get("encoding", "UTF-8"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
new file mode 100644
index 0000000..b81d9fa
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.string;
+
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class StringData implements Data {
+    private final Object datum;
+    private final Schema schema;
+
+    public StringData(Object datum) {
+        this.datum = datum;
+        this.schema = new StringSchema();
+    }
+
+    @Override
+    public Schema schema() {
+        return this.schema;
+    }
+
+    @Override
+    public Object value() {
+        return this.datum;
+    }
+
+    @Override
+    public int intValue() {
+        throw new UnsupportedOperationException("Can't get int value for a string type data");
+    }
+
+    @Override
+    public long longValue() {
+        throw new UnsupportedOperationException("Can't get long value for a string type data");
+    }
+
+    @Override
+    public float floatValue() {
+        throw new UnsupportedOperationException("Can't get float value for a string type data");
+    }
+
+    @Override
+    public double doubleValue() {
+        throw new UnsupportedOperationException("Can't get double value for a string type data");
+    }
+
+    @Override
+    public boolean booleanValue() {
+        throw new UnsupportedOperationException("Can't get boolean value for a string type data");
+    }
+
+    @Override
+    public String strValue() {
+        return String.valueOf(datum);
+    }
+
+    @Override
+    public byte[] bytesValue() {
+        throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
+    }
+
+    @Override
+    public List<Object> arrayValue() {
+        throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
+    }
+
+    @Override
+    public Map<Object, Object> mapValue() {
+        throw new UnsupportedOperationException("Can't get mapValue for a string type data");
+    }
+
+    @Override
+    public Data getElement(int index) {
+        throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
+    }
+
+    @Override
+    public Data getFieldData(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
new file mode 100644
index 0000000..348fc0c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.string;
+
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+import java.util.Map;
+
+public class StringSchema implements Schema {
+    private Type type = Type.STRING;
+
+    @Override
+    public Type getType() {
+      return Type.STRING;
+    }
+
+    @Override
+    public Schema getElementType() {
+      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+    }
+
+    @Override
+    public Schema getValueType() {
+        throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+    }
+
+    @Override
+    public Map<String, Schema> getFields() {
+        throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+    }
+
+    @Override
+    public Schema getFieldType(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+    }
+
+    @Override
+    public Data read(Object object) {
+        return new StringData(object);
+    }
+
+    @Override
+    public Data transform(Data inputData) {
+        if (inputData.schema().getType() != this.type) {
+            throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+                    + ", input type:" + inputData.schema().getType());
+        }
+        return inputData;
+    }
+
+    @Override
+    public boolean equals(Schema other) {
+        return other.getType() == this.type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 7921d4f..986d688 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -82,9 +82,8 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
 
   @Override
   public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
-        null /* TODO: when merge with Schema API changes, use: tuple
-             .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage()));
+    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
+        tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
   }
 
 }