You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/30 18:25:08 UTC

samza git commit: SAMZA-1968: Samza-sql - Change Calcite sql type for samza sql rel message __key__ to accept any format

Repository: samza
Updated Branches:
  refs/heads/master 64c82634c -> dcd4b558a


SAMZA-1968: Samza-sql - Change Calcite sql type for samza sql rel message __key__ to accept any format

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #774 from atoomula/keyformat


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dcd4b558
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dcd4b558
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dcd4b558

Branch: refs/heads/master
Commit: dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b
Parents: 64c8263
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Oct 30 11:25:00 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Oct 30 11:25:00 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/sql/SamzaSqlRelRecord.java |  13 +-
 .../sql/client/impl/SamzaExecutorTest.java      |   2 +-
 .../apache/samza/sql/avro/AvroRelConverter.java |  59 ++++---
 .../samza/sql/avro/AvroRelSchemaProvider.java   |   5 +
 .../samza/sql/data/SamzaSqlCompositeKey.java    |  82 ---------
 .../samza/sql/data/SamzaSqlRelMessage.java      |  52 +++++-
 .../apache/samza/sql/fn/ConvertToStringUdf.java |  39 +++++
 .../org/apache/samza/sql/fn/GetSqlFieldUdf.java |  95 +++++++++++
 .../sql/impl/ConfigBasedIOResolverFactory.java  |  14 +-
 .../apache/samza/sql/planner/QueryPlanner.java  |   2 +-
 .../samza/sql/translator/JoinTranslator.java    |  33 ++--
 .../SamzaSqlRelMessageJoinFunction.java         |  23 ++-
 .../samza/sql/avro/TestAvroRelConversion.java   |  10 +-
 .../samza/sql/data/TestSamzaSqlRelMessage.java  |  30 ++++
 .../samza/sql/fn/TestConvertToStringUdf.java    |  62 +++++++
 .../apache/samza/sql/fn/TestGetSqlFieldUdf.java | 166 +++++++++++++++++++
 .../samza/sql/system/TestAvroSystemFactory.java |   6 +-
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |   1 -
 .../sql/testutil/TestIOResolverFactory.java     |  13 +-
 .../TestSamzaSqlRelMessageJoinFunction.java     |  21 ++-
 .../tools/avro/AvroSchemaGenRelConverter.java   |  12 +-
 21 files changed, 570 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
index e17a273..a877e6b 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql;
 
+import com.google.common.base.Joiner;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,6 +41,7 @@ public class SamzaSqlRelRecord implements Serializable {
   private final ArrayList<String> fieldNames;
   @JsonProperty("fieldValues")
   private final ArrayList<Object> fieldValues;
+  private final int hashCode;
 
   /**
    * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values.
@@ -59,6 +61,8 @@ public class SamzaSqlRelRecord implements Serializable {
 
     this.fieldNames.addAll(fieldNames);
     this.fieldValues.addAll(fieldValues);
+
+    hashCode = Objects.hash(fieldNames, fieldValues);
   }
 
   /**
@@ -96,7 +100,7 @@ public class SamzaSqlRelRecord implements Serializable {
 
   @Override
   public int hashCode() {
-    return Objects.hash(fieldNames, fieldValues);
+    return hashCode;
   }
 
   @Override
@@ -110,4 +114,11 @@ public class SamzaSqlRelRecord implements Serializable {
     SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj;
     return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues);
   }
+
+  @Override
+  public String toString() {
+    String nameStr = Joiner.on(",").join(fieldNames);
+    String valueStr = Joiner.on(",").useForNull("null").join(fieldValues);
+    return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
index 18fe4b7..91ec7f6 100644
--- a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -61,7 +61,7 @@ public class SamzaExecutorTest {
         Assert.assertEquals("NewCompany", ts.getFieldName(2));
         Assert.assertEquals("OldCompany", ts.getFieldName(3));
         Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
-        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
+        Assert.assertEquals("ANY", ts.getFieldTypeName(0));
         Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
         Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
         Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 7d97466..89026ee 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -61,13 +61,13 @@ import org.slf4j.LoggerFactory;
 public class AvroRelConverter implements SamzaRelConverter {
 
   protected final Config config;
-  private final Schema avroSchema;
+  private final Schema payloadSchema;
 
   private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
 
   public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
     this.config = config;
-    this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+    this.payloadSchema = Schema.parse(schemaProvider.getSchema(systemStream));
   }
 
   /**
@@ -76,45 +76,50 @@ public class AvroRelConverter implements SamzaRelConverter {
    */
   @Override
   public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
-    List<Object> fieldValues = new ArrayList<>();
-    List<String> fieldNames = new ArrayList<>();
+    List<String> payloadFieldNames = new ArrayList<>();
+    List<Object> payloadFieldValues = new ArrayList<>();
     Object value = samzaMessage.getValue();
     if (value instanceof IndexedRecord) {
-      IndexedRecord record = (IndexedRecord) value;
-      // Please note that record schema and cached schema could be different due to schema evolution.
-      // Always represent record schema in the form of cached schema. This approach has the side-effect
-      // of dropping the newly added fields in the scenarios where the record schema has newer version
-      // than the cached schema. [TODO: SAMZA-1679]
-      Schema recordSchema = record.getSchema();
-      fieldNames.addAll(avroSchema.getFields().stream()
-          .map(Schema.Field::name)
-          .collect(Collectors.toList()));
-      fieldValues.addAll(fieldNames.stream()
-          .map(f -> convertToJavaObject(
-              recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
-              getNonNullUnionSchema(avroSchema.getField(f).schema())))
-          .collect(Collectors.toList()));
+      fetchFieldNamesAndValuesFromIndexedRecord((IndexedRecord) value, payloadFieldNames, payloadFieldValues,
+          payloadSchema);
     } else if (value == null) {
-      fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
-      IntStream.range(0, fieldNames.size()).forEach(x -> fieldValues.add(null));
+      payloadFieldNames.addAll(payloadSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+      IntStream.range(0, payloadFieldNames.size()).forEach(x -> payloadFieldValues.add(null));
     } else {
       String msg = "Avro message converter doesn't support messages of type " + value.getClass();
       LOG.error(msg);
       throw new SamzaException(msg);
     }
 
-    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues);
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues);
+  }
+
+  public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
+      List<Object> fieldValues, Schema cachedSchema) {
+    // Please note that record schema and cached schema could be different due to schema evolution.
+    // Always represent record schema in the form of cached schema. This approach has the side-effect
+    // of dropping the newly added fields in the scenarios where the record schema has newer version
+    // than the cached schema. [TODO: SAMZA-1679]
+    Schema recordSchema = record.getSchema();
+    fieldNames.addAll(cachedSchema.getFields().stream()
+        .map(Schema.Field::name)
+        .collect(Collectors.toList()));
+    fieldValues.addAll(fieldNames.stream()
+        .map(f -> convertToJavaObject(
+            recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
+            getNonNullUnionSchema(payloadSchema.getField(f).schema())))
+        .collect(Collectors.toList()));
   }
 
   private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
-    List<Object> values = new ArrayList<>();
+    List<Object> fieldValues = new ArrayList<>();
     List<String> fieldNames = new ArrayList<>();
     if (avroRecord != null) {
       fieldNames.addAll(avroRecord.getSchema().getFields()
           .stream()
           .map(Schema.Field::name)
           .collect(Collectors.toList()));
-      values.addAll(avroRecord.getSchema().getFields()
+      fieldValues.addAll(avroRecord.getSchema().getFields()
           .stream()
           .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
               getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
@@ -125,7 +130,7 @@ public class AvroRelConverter implements SamzaRelConverter {
       throw new SamzaException(msg);
     }
 
-    return new SamzaSqlRelRecord(fieldNames, values);
+    return new SamzaSqlRelRecord(fieldNames, fieldValues);
   }
 
   /**
@@ -133,11 +138,11 @@ public class AvroRelConverter implements SamzaRelConverter {
    */
   @Override
   public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
-    return convertToSamzaMessage(relMessage, this.avroSchema);
+    return convertToSamzaMessage(relMessage, this.payloadSchema);
   }
 
-  protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema avroSchema) {
-    return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema));
+  protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema payloadSchema) {
+    return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
   }
 
   private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
index fb11624..f37c740 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
@@ -24,5 +24,10 @@ import org.apache.samza.system.SystemStream;
 
 
 public interface AvroRelSchemaProvider extends RelSchemaProvider {
+  /**
+   * Get payload schema corresponding to the system stream.
+   * @param systemStream system stream for which payload schema needs to be obtained.
+   * @return schema in the form of string
+   */
   String getSchema(SystemStream systemStream);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
deleted file mode 100644
index 4b4b8f2..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.data;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-
-/**
- * A serializable class that holds different key parts.
- */
-public class SamzaSqlCompositeKey implements Serializable {
-
-  @JsonProperty("keyParts")
-  private ArrayList<Object> keyParts;
-  private int hashCode;
-
-  @JsonCreator
-  public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) {
-    this.keyParts = new ArrayList<>(keyParts);
-    hashCode = keyParts.hashCode();
-  }
-
-  /**
-   * Get the keyParts of all the columns in the relational message.
-   * @return the keyParts of all the columns
-   */
-  @JsonProperty("keyParts")
-  public ArrayList<Object> getKeyParts() {
-    return keyParts;
-  }
-
-  @Override
-  public String toString() {
-    return String.join(", ", Arrays.toString(keyParts.toArray()));
-  }
-
-  @Override
-  public int hashCode() {
-    return hashCode;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts);
-  }
-
-  /**
-   * Create the SamzaSqlCompositeKey from the rel message.
-   * @param message Represents the samza sql rel message.
-   * @param relIdx list of keys in the form of field indices within the rel message.
-   * @return the composite key of the rel message
-   */
-  public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
-    ArrayList<Object> keyParts = new ArrayList<>();
-    for (int idx : relIdx) {
-      keyParts.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
-    }
-    return new SamzaSqlCompositeKey(keyParts);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 3ebbb23..55ce7b0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -31,12 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
 /**
  * Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
  * Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list
- * of column values and the associated column names.
+ * of column values and the associated column names. Please note that the primary key itself could be a
+ * {@link SamzaSqlRelRecord}.
  */
 public class SamzaSqlRelMessage implements Serializable {
 
   public static final String KEY_NAME = "__key__";
 
+  // key could be a record in itself.
   private final Object key;
 
   @JsonProperty("samzaSqlRelRecord")
@@ -122,4 +124,52 @@ public class SamzaSqlRelMessage implements Serializable {
     SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
     return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord);
   }
+
+  @Override
+  public String toString() {
+    return "RelMessage: {" + samzaSqlRelRecord + "}";
+  }
+
+  /**
+   * Create composite key from the rel message.
+   * @param message Represents the samza sql rel message to extract the key values from.
+   * @param keyValueIdx list of key values in the form of field indices within the rel message.
+   * @param keyPartNames Represents the key field names.
+   * @return the composite key of the rel message
+   */
+  public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> keyValueIdx,
+      List<String> keyPartNames) {
+    Validate.isTrue(keyValueIdx.size() == keyPartNames.size(), "Key part name and value list sizes are different");
+    ArrayList<Object> keyPartValues = new ArrayList<>();
+    for (int idx : keyValueIdx) {
+      keyPartValues.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
+    }
+    return new SamzaSqlRelRecord(keyPartNames, keyPartValues);
+  }
+
+  /**
+   * Create composite key from the rel message.
+   * @param message Represents the samza sql rel message to extract the key values and names from.
+   * @param relIdx list of keys in the form of field indices within the rel message.
+   * @return the composite key of the rel message
+   */
+  public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
+    return createSamzaSqlCompositeKey(message, relIdx,
+        getSamzaSqlCompositeKeyFieldNames(message.getSamzaSqlRelRecord().getFieldNames(), relIdx));
+  }
+
+  /**
+   * Get composite key field names.
+   * @param fieldNames list of field names to extract the key names from.
+   * @param nameIds indices within the field names.
+   * @return list of composite key field names
+   */
+  public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldNames,
+      List<Integer> nameIds) {
+    List<String> keyPartNames = new ArrayList<>();
+    for (int idx : nameIds) {
+      keyPartNames.add(fieldNames.get(idx));
+    }
+    return keyPartNames;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
new file mode 100644
index 0000000..e31c2bb
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+/**
+ * UDF that converts an object to it's string representation.
+ */
+public class ConvertToStringUdf implements ScalarUdf<String> {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  @Override
+  public String execute(Object... args) {
+    return args[0].toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
new file mode 100644
index 0000000..58b5c99
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+/**
+ * UDF that extracts a field value from a nested SamzaSqlRelRecord by recursively following a query path.
+ * Note that the root object must be a SamzaSqlRelRecord.
+ *
+ * Syntax for field specification:
+ * <ul>
+ *  <li> SamzaSqlRelRecord/Map: <code> field.subfield </code> </li>
+ *  <li> Array: <code> field[index] </code> </li>
+ *  <li> Scalar types: <code> field </code> </li>
+ * </ul>
+ *
+ * Example query: <code> pageViewEvent.requestHeader.properties.cookies[3].sessionKey </code>
+ *
+ * Above query extracts the sessionKey field from below nested record:
+ *
+ *   pageViewEvent (SamzaSqlRelRecord)
+ *     - requestHeader (SamzaSqlRelRecord)
+ *       - properties (Map)
+ *         - cookies (Array)
+ *           - sessionKey (Scalar)
+ *
+ */
+public class GetSqlFieldUdf implements ScalarUdf<String> {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  @Override
+  public String execute(Object... args) {
+    Object currentFieldOrValue = args[0];
+    Validate.isTrue(currentFieldOrValue == null
+        || currentFieldOrValue instanceof SamzaSqlRelRecord);
+    if (currentFieldOrValue != null && args.length > 1) {
+      String[] fieldNameChain = ((String) args[1]).split("\\.");
+      for (int i = 0; i < fieldNameChain.length && currentFieldOrValue != null; i++) {
+        currentFieldOrValue = extractField(fieldNameChain[i], currentFieldOrValue);
+      }
+    }
+
+    if (currentFieldOrValue != null) {
+      return currentFieldOrValue.toString();
+    }
+
+    return null;
+  }
+
+  static Object extractField(String fieldName, Object current) {
+    if (current instanceof SamzaSqlRelRecord) {
+      SamzaSqlRelRecord record = (SamzaSqlRelRecord) current;
+      Validate.isTrue(record.getFieldNames().contains(fieldName),
+          String.format("Invalid field %s in %s", fieldName, record));
+      return record.getField(fieldName).orElse(null);
+    } else if (current instanceof Map) {
+      Map map = (Map) current;
+      Validate.isTrue(map.containsKey(fieldName), String.format("Invalid field %s in %s", fieldName, map));
+      return map.get(fieldName);
+    } else if (current instanceof List && fieldName.endsWith("]")) {
+      List list = (List) current;
+      int index = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1));
+      return list.get(index);
+    }
+
+    throw new IllegalArgumentException(String.format(
+        "Unsupported accessing operation for data type: %s with field: %s.", current.getClass(), fieldName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 2514d30..80cb789 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,14 +21,15 @@ package org.apache.samza.sql.impl;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,9 +108,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
       TableDescriptor tableDescriptor = null;
       if (isTable) {
         String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-");
-        tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
-            new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-            new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
+        SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+            (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+        SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+            (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+        tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde))
+            .withChangelogEnabled();
       }
 
       return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index f36d990..d83ca7f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -150,7 +150,7 @@ public class QueryPlanner {
       public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         List<RelDataTypeField> fieldsList = new ArrayList<>();
         fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0,
-            typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true)));
+            typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true)));
         fieldsList.addAll(relationalSchema.getFieldList());
         return new RelRecordType(fieldsList);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 0939f7b..5f44ff9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.translator;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -39,19 +40,19 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
 
 
 /**
@@ -103,8 +104,10 @@ class JoinTranslator {
         isTablePosOnRight ?
             context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
 
-    List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames();
-    List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames();
+    List<String> streamFieldNames =
+        new ArrayList<>((isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames());
+    List<String> tableFieldNames =
+        new ArrayList<>((isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames());
     Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
     log.info("Joining on the following Stream and Table field(s): ");
     for (int i = 0; i < streamKeyIds.size(); i++) {
@@ -113,23 +116,25 @@ class JoinTranslator {
 
     SamzaSqlRelMessageJoinFunction joinFn =
         new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
-            tableFieldNames);
+            tableKeyIds, tableFieldNames);
 
-    Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+        (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
         (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
 
     // Always re-partition the messages from the input stream by the composite key and then join the messages
-    // with the table.
+    // with the table. For the composite key, provide the corresponding table names in the key instead of using
+    // the names from the stream as the lookup needs to be done based on what is stored in the local table.
     MessageStream<SamzaSqlRelMessage> outputStream =
         inputStream
-            .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
+            .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
+                getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)),
                 m -> m,
                 KVSerde.of(keySerde, valueSerde),
                 intermediateStreamPrefix + "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
-    // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
 
     context.registerMessageStream(join.getId(), outputStream);
   }
@@ -299,13 +304,13 @@ class JoinTranslator {
 
     // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
     // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
-    Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
+    Table<KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> table =
         context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get());
 
-    Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+        (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
         (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-
     // Let's always repartition by the join fields as key before sending the key and value to the table.
     // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
     // have the same partitioning scheme and partition key.

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
index 889ea97..d0a3d11 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -25,20 +25,21 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
 
 
 /**
  * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
- * being {@link SamzaSqlCompositeKey}
+ * being {@link SamzaSqlRelRecord}
  */
 public class SamzaSqlRelMessageJoinFunction
-    implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
+    implements StreamTableJoinFunction<SamzaSqlRelRecord, SamzaSqlRelMessage, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
 
   private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
 
@@ -46,17 +47,20 @@ public class SamzaSqlRelMessageJoinFunction
   private final boolean isTablePosOnRight;
   private final ArrayList<Integer> streamFieldIds;
   // Table field names are used in the outer join when the table record is not found.
+  private final ArrayList<Integer> tableKeyIds;
   private final ArrayList<String> tableFieldNames;
   private final ArrayList<String> outFieldNames;
 
   SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
-      List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) {
+      List<Integer> streamFieldIds, List<String> streamFieldNames, List<Integer> tableKeyIds,
+      List<String> tableFieldNames) {
     this.joinRelType = joinRelType;
     this.isTablePosOnRight = isTablePosOnRight;
     Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
         (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
         joinRelType.compareTo(JoinRelType.INNER) == 0);
     this.streamFieldIds = new ArrayList<>(streamFieldIds);
+    this.tableKeyIds = new ArrayList<>(tableKeyIds);
     this.tableFieldNames = new ArrayList<>(tableFieldNames);
     this.outFieldNames = new ArrayList<>();
     if (isTablePosOnRight) {
@@ -69,7 +73,7 @@ public class SamzaSqlRelMessageJoinFunction
   }
 
   @Override
-  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
 
     if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
       log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
@@ -106,12 +110,13 @@ public class SamzaSqlRelMessageJoinFunction
   }
 
   @Override
-  public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) {
-    return createSamzaSqlCompositeKey(message, streamFieldIds);
+  public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
+    return createSamzaSqlCompositeKey(message, streamFieldIds,
+        getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
   }
 
   @Override
-  public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+  public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
     return record.getKey();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 708eb3e..40aa791 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.avro;
 
-import com.google.common.base.Joiner;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -160,8 +159,7 @@ public class TestAvroRelConversion {
     record.put("name", "name1");
 
     SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
-    LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldValues()));
-    LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
+    LOG.info(message.toString());
   }
 
   @Test
@@ -263,8 +261,7 @@ public class TestAvroRelConversion {
 
     SamzaSqlRelMessage relMessage = nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
 
-    LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldValues()));
-    LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldNames()));
+    LOG.info(relMessage.toString());
 
     KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(relMessage);
     GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue();
@@ -321,8 +318,7 @@ public class TestAvroRelConversion {
         Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(),
             DEFAULT_TRACKING_ID_BYTES));
 
-    LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues()));
-    LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
+    LOG.info(message.toString());
 
     KV<Object, Object> samzaMessage = complexRecordAvroRelConverter.convertToSamzaMessage(message);
     GenericRecord record = (GenericRecord) samzaMessage.getValue();

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
index d0a2f59..a4f3afc 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
@@ -20,7 +20,9 @@
 package org.apache.samza.sql.data;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -61,4 +63,32 @@ public class TestSamzaSqlRelMessage {
     Assert.assertNotEquals(message1, message2);
     Assert.assertNotEquals(message1.hashCode(), message2.hashCode());
   }
+
+  @Test
+  public void testCompositeKeyCreation() {
+    List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+
+    SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Collections.singletonList(0));
+    Assert.assertEquals(relRecord1.getFieldNames().size(), 1);
+    Assert.assertEquals(relRecord1.getFieldNames().get(0), "field1");
+    Assert.assertEquals(relRecord1.getFieldValues().get(0), "value1");
+
+    SamzaSqlRelRecord relRecord2 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0),
+        SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1, 0)));
+    Assert.assertEquals(relRecord2.getFieldNames().size(), 2);
+    Assert.assertEquals(relRecord2.getFieldNames().get(0), "kfield2");
+    Assert.assertEquals(relRecord2.getFieldValues().get(0), "value2");
+    Assert.assertEquals(relRecord2.getFieldNames().get(1), "kfield1");
+    Assert.assertEquals(relRecord2.getFieldValues().get(1), "value1");
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testCompositeKeyCreationWithInEqualKeyNameValues() {
+    List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+
+    SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0),
+        SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
new file mode 100644
index 0000000..9b80549
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestConvertToStringUdf {
+
+  private enum LightSwitch {
+    On,
+    Off
+  }
+
+  @Test
+  public void testConvertIntegerToString() {
+    ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+    Assert.assertEquals(convertToStringUdf.execute(10), "10");
+  }
+
+  @Test
+  public void testConvertLongToString() {
+    ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+    Assert.assertEquals(convertToStringUdf.execute(10000000000L), "10000000000");
+  }
+
+  @Test
+  public void testConvertDoubleToString() {
+    ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+    Assert.assertEquals(convertToStringUdf.execute(10.0000345), "10.0000345");
+  }
+
+  @Test
+  public void testConvertBooleanToString() {
+    ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+    Assert.assertEquals(convertToStringUdf.execute(true), "true");
+  }
+
+  @Test
+  public void testConvertEnumToString() {
+    ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+    Assert.assertEquals(convertToStringUdf.execute(LightSwitch.On), "On");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
new file mode 100644
index 0000000..b084fd1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
@@ -0,0 +1,166 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestGetSqlFieldUdf {
+  static Object createRecord(List<String> fieldNames, int level) {
+    String fieldName = fieldNames.get(level);
+    Object child = (level == fieldNames.size() - 1) ? "bar" : createRecord(fieldNames, level + 1);
+    boolean isMap = false;
+    int arrayIndex = -1;
+    if (fieldName.startsWith("map:")) {
+      isMap = true;
+      fieldName = fieldName.substring(4); // strip "map:"
+    } else if (fieldName.endsWith("]")) {
+      arrayIndex = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1));
+      fieldName = fieldName.substring(0, fieldName.indexOf("["));
+    }
+
+    if (isMap) {
+      Map<String, Object> retMap = new HashMap<>();
+      retMap.put(fieldName, child);
+      return retMap;
+    } else if (arrayIndex >= 0) {
+      List list = Arrays.asList(new Object[2 * arrayIndex]);
+      list.set(arrayIndex, child);
+      return list;
+    } else {
+      return new SamzaSqlRelRecord(Collections.singletonList(fieldName), Collections.singletonList(child));
+    }
+  }
+
+  private SamzaSqlRelRecord createRecord(String path) {
+    return (SamzaSqlRelRecord) createRecord(Arrays.asList(path.split("\\.")), 0);
+  }
+
+  @Test
+  public void testSingleLevel() {
+    SamzaSqlRelRecord record = createRecord("foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "foo"), "bar");
+  }
+
+  @Test
+  public void testMultiLevel() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+  }
+
+  @Test
+  public void testNullRecord() {
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(null, "bar.baz.baf.foo"), null);
+  }
+
+  @Test (expected = NullPointerException.class)
+  public void testNullFields() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    getSqlFieldUdf.execute(record, null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSingleLevelInvalidField() {
+    SamzaSqlRelRecord record = createRecord("foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    getSqlFieldUdf.execute(record, "bar");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMultiLevelInvalidIntermediateField() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    getSqlFieldUdf.execute(record, "bar.baz.bacon");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMultiLevelInvalidFinalField() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    getSqlFieldUdf.execute(record, "bar.baz.baf.funny");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPathTooDeep() {
+    SamzaSqlRelRecord record = createRecord("bar.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    getSqlFieldUdf.execute(record, "bar.baz.baf.funny");
+  }
+
+  @Test
+  public void testMapAtLastField() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.map:foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+  }
+
+  @Test
+  public void testMapAtIntermediateFields() {
+    SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+  }
+
+  @Test
+  public void testMapAtAllIntermediateFields() {
+    SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.map:foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+  }
+
+  @Test
+  public void testArrayAtLastField() {
+    SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo[3]");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3]"), "bar");
+  }
+
+  @Test
+  public void testArrayAtIntermediateFields() {
+    SamzaSqlRelRecord record = createRecord("bar.baz[3].baf[2].foo");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[3].baf[2].foo"), "bar");
+  }
+
+  @Test
+  public void testArrayAtAllIntermediateFields() {
+    SamzaSqlRelRecord record = createRecord("bar.baz[2].baf[3].foo[5]");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[2].baf[3].foo[5]"), "bar");
+  }
+
+  @Test
+  public void testAllFieldTypes() {
+    SamzaSqlRelRecord record = createRecord("bar.map:baz.baf.foo[3].fun");
+    GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+    Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3].fun"), "bar");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index fd811cd..41c809e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -188,7 +188,7 @@ public class TestAvroSystemFactory implements SystemFactory {
         // We send num Messages and an end of stream message following that.
         List<IncomingMessageEnvelope> envelopes =
             IntStream.range(curMessages, curMessages + numMessages/4)
-                .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
+                .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, getKey(i, ssp),
                     getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
                 .collect(Collectors.toList());
         envelopeMap.put(ssp, envelopes);
@@ -201,6 +201,10 @@ public class TestAvroSystemFactory implements SystemFactory {
       return envelopeMap;
     }
 
+    private Object getKey(int index, SystemStreamPartition ssp) {
+      return "key" + index;
+    }
+
     private Object getData(int index, SystemStreamPartition ssp) {
       if (simpleRecordMap.contains(ssp)) {
         return createSimpleRecord(index);

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index e54223c..76ebfc2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -42,7 +42,6 @@ import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 818e33d..5e7dd4c 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -26,16 +26,15 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
@@ -207,9 +206,11 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
             tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
           } else {
             String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
-            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
-                new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-                new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
+            SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+                (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+            SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+                (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
           }
           tableDescMap.put(ioName, tableDescriptor);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
index 5dd2d21..6362155 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.samza.operators.KV;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -45,11 +45,11 @@ public class TestSamzaSqlRelMessageJoinFunction {
     JoinRelType joinRelType = JoinRelType.INNER;
     List<Integer> streamKeyIds = Arrays.asList(0, 1);
     List<Integer> tableKeyIds = Arrays.asList(0, 1);
-    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
-    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+    SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
 
     SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
 
     Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
@@ -68,11 +68,12 @@ public class TestSamzaSqlRelMessageJoinFunction {
     JoinRelType joinRelType = JoinRelType.INNER;
     List<Integer> streamKeyIds = Arrays.asList(0, 2);
     List<Integer> tableKeyIds = Arrays.asList(0, 2);
-    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
-    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+    SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
 
     SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames,
+            tableKeyIds, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
 
     Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
@@ -89,9 +90,10 @@ public class TestSamzaSqlRelMessageJoinFunction {
     SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
     JoinRelType joinRelType = JoinRelType.INNER;
     List<Integer> streamKeyIds = Arrays.asList(0, 1);
+    List<Integer> tableKeyIds = Arrays.asList(2, 3);
 
     SamzaSqlRelMessageJoinFunction joinFn =
-        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
     Assert.assertNull(outMsg);
   }
@@ -101,10 +103,11 @@ public class TestSamzaSqlRelMessageJoinFunction {
     SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
     JoinRelType joinRelType = JoinRelType.LEFT;
     List<Integer> streamKeyIds = Arrays.asList(0, 1);
+    List<Integer> tableKeyIds = Arrays.asList(2, 3);
 
     SamzaSqlRelMessageJoinFunction joinFn =
         new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
-            tableFieldNames);
+            tableKeyIds, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
 
     Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),

http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
index 6c7a8c2..1b1edbc 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.avro.AvroRelConverter;
 import org.apache.samza.sql.avro.AvroRelSchemaProvider;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
@@ -52,14 +53,15 @@ public class AvroSchemaGenRelConverter extends AvroRelConverter {
 
   @Override
   public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
-    Schema schema = computeSchema(streamName, relMessage);
-    return convertToSamzaMessage(relMessage, schema);
+    Schema payloadSchema = computePayloadSchema(streamName, relMessage);
+    return convertToSamzaMessage(relMessage, payloadSchema);
   }
 
-  private Schema computeSchema(String streamName, SamzaSqlRelMessage relMessage) {
+  private Schema computePayloadSchema(String streamName, SamzaSqlRelMessage relMessage) {
+    SamzaSqlRelRecord relRecord = relMessage.getSamzaSqlRelRecord();
     List<Schema.Field> keyFields = new ArrayList<>();
-    List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames();
-    List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+    List<String> fieldNames = relRecord.getFieldNames();
+    List<Object> values = relRecord.getFieldValues();
 
     for (int index = 0; index < fieldNames.size(); index++) {
       if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || values.get(index) == null) {