You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/30 18:25:08 UTC
samza git commit: SAMZA-1968: Samza-sql - Change Calcite sql type for
samza sql rel message __key__ to accept any format
Repository: samza
Updated Branches:
refs/heads/master 64c82634c -> dcd4b558a
SAMZA-1968: Samza-sql - Change Calcite sql type for samza sql rel message __key__ to accept any format
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #774 from atoomula/keyformat
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dcd4b558
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dcd4b558
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dcd4b558
Branch: refs/heads/master
Commit: dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b
Parents: 64c8263
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Oct 30 11:25:00 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Oct 30 11:25:00 2018 -0700
----------------------------------------------------------------------
.../org/apache/samza/sql/SamzaSqlRelRecord.java | 13 +-
.../sql/client/impl/SamzaExecutorTest.java | 2 +-
.../apache/samza/sql/avro/AvroRelConverter.java | 59 ++++---
.../samza/sql/avro/AvroRelSchemaProvider.java | 5 +
.../samza/sql/data/SamzaSqlCompositeKey.java | 82 ---------
.../samza/sql/data/SamzaSqlRelMessage.java | 52 +++++-
.../apache/samza/sql/fn/ConvertToStringUdf.java | 39 +++++
.../org/apache/samza/sql/fn/GetSqlFieldUdf.java | 95 +++++++++++
.../sql/impl/ConfigBasedIOResolverFactory.java | 14 +-
.../apache/samza/sql/planner/QueryPlanner.java | 2 +-
.../samza/sql/translator/JoinTranslator.java | 33 ++--
.../SamzaSqlRelMessageJoinFunction.java | 23 ++-
.../samza/sql/avro/TestAvroRelConversion.java | 10 +-
.../samza/sql/data/TestSamzaSqlRelMessage.java | 30 ++++
.../samza/sql/fn/TestConvertToStringUdf.java | 62 +++++++
.../apache/samza/sql/fn/TestGetSqlFieldUdf.java | 166 +++++++++++++++++++
.../samza/sql/system/TestAvroSystemFactory.java | 6 +-
.../samza/sql/testutil/SamzaSqlTestConfig.java | 1 -
.../sql/testutil/TestIOResolverFactory.java | 13 +-
.../TestSamzaSqlRelMessageJoinFunction.java | 21 ++-
.../tools/avro/AvroSchemaGenRelConverter.java | 12 +-
21 files changed, 570 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
index e17a273..a877e6b 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -19,6 +19,7 @@
package org.apache.samza.sql;
+import com.google.common.base.Joiner;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -40,6 +41,7 @@ public class SamzaSqlRelRecord implements Serializable {
private final ArrayList<String> fieldNames;
@JsonProperty("fieldValues")
private final ArrayList<Object> fieldValues;
+ private final int hashCode;
/**
* Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values.
@@ -59,6 +61,8 @@ public class SamzaSqlRelRecord implements Serializable {
this.fieldNames.addAll(fieldNames);
this.fieldValues.addAll(fieldValues);
+
+ hashCode = Objects.hash(fieldNames, fieldValues);
}
/**
@@ -96,7 +100,7 @@ public class SamzaSqlRelRecord implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(fieldNames, fieldValues);
+ return hashCode;
}
@Override
@@ -110,4 +114,11 @@ public class SamzaSqlRelRecord implements Serializable {
SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj;
return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues);
}
+
+ @Override
+ public String toString() {
+ String nameStr = Joiner.on(",").join(fieldNames);
+ String valueStr = Joiner.on(",").useForNull("null").join(fieldValues);
+ return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]";
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
index 18fe4b7..91ec7f6 100644
--- a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -61,7 +61,7 @@ public class SamzaExecutorTest {
Assert.assertEquals("NewCompany", ts.getFieldName(2));
Assert.assertEquals("OldCompany", ts.getFieldName(3));
Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
- Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
+ Assert.assertEquals("ANY", ts.getFieldTypeName(0));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 7d97466..89026ee 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -61,13 +61,13 @@ import org.slf4j.LoggerFactory;
public class AvroRelConverter implements SamzaRelConverter {
protected final Config config;
- private final Schema avroSchema;
+ private final Schema payloadSchema;
private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
this.config = config;
- this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+ this.payloadSchema = Schema.parse(schemaProvider.getSchema(systemStream));
}
/**
@@ -76,45 +76,50 @@ public class AvroRelConverter implements SamzaRelConverter {
*/
@Override
public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
- List<Object> fieldValues = new ArrayList<>();
- List<String> fieldNames = new ArrayList<>();
+ List<String> payloadFieldNames = new ArrayList<>();
+ List<Object> payloadFieldValues = new ArrayList<>();
Object value = samzaMessage.getValue();
if (value instanceof IndexedRecord) {
- IndexedRecord record = (IndexedRecord) value;
- // Please note that record schema and cached schema could be different due to schema evolution.
- // Always represent record schema in the form of cached schema. This approach has the side-effect
- // of dropping the newly added fields in the scenarios where the record schema has newer version
- // than the cached schema. [TODO: SAMZA-1679]
- Schema recordSchema = record.getSchema();
- fieldNames.addAll(avroSchema.getFields().stream()
- .map(Schema.Field::name)
- .collect(Collectors.toList()));
- fieldValues.addAll(fieldNames.stream()
- .map(f -> convertToJavaObject(
- recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
- getNonNullUnionSchema(avroSchema.getField(f).schema())))
- .collect(Collectors.toList()));
+ fetchFieldNamesAndValuesFromIndexedRecord((IndexedRecord) value, payloadFieldNames, payloadFieldValues,
+ payloadSchema);
} else if (value == null) {
- fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
- IntStream.range(0, fieldNames.size()).forEach(x -> fieldValues.add(null));
+ payloadFieldNames.addAll(payloadSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+ IntStream.range(0, payloadFieldNames.size()).forEach(x -> payloadFieldValues.add(null));
} else {
String msg = "Avro message converter doesn't support messages of type " + value.getClass();
LOG.error(msg);
throw new SamzaException(msg);
}
- return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues);
+ return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues);
+ }
+
+ public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
+ List<Object> fieldValues, Schema cachedSchema) {
+ // Please note that record schema and cached schema could be different due to schema evolution.
+ // Always represent record schema in the form of cached schema. This approach has the side-effect
+ // of dropping the newly added fields in the scenarios where the record schema has newer version
+ // than the cached schema. [TODO: SAMZA-1679]
+ Schema recordSchema = record.getSchema();
+ fieldNames.addAll(cachedSchema.getFields().stream()
+ .map(Schema.Field::name)
+ .collect(Collectors.toList()));
+ fieldValues.addAll(fieldNames.stream()
+ .map(f -> convertToJavaObject(
+ recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
+ getNonNullUnionSchema(payloadSchema.getField(f).schema())))
+ .collect(Collectors.toList()));
}
private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
- List<Object> values = new ArrayList<>();
+ List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (avroRecord != null) {
fieldNames.addAll(avroRecord.getSchema().getFields()
.stream()
.map(Schema.Field::name)
.collect(Collectors.toList()));
- values.addAll(avroRecord.getSchema().getFields()
+ fieldValues.addAll(avroRecord.getSchema().getFields()
.stream()
.map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
@@ -125,7 +130,7 @@ public class AvroRelConverter implements SamzaRelConverter {
throw new SamzaException(msg);
}
- return new SamzaSqlRelRecord(fieldNames, values);
+ return new SamzaSqlRelRecord(fieldNames, fieldValues);
}
/**
@@ -133,11 +138,11 @@ public class AvroRelConverter implements SamzaRelConverter {
*/
@Override
public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
- return convertToSamzaMessage(relMessage, this.avroSchema);
+ return convertToSamzaMessage(relMessage, this.payloadSchema);
}
- protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema avroSchema) {
- return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema));
+ protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema payloadSchema) {
+ return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
}
private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
index fb11624..f37c740 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
@@ -24,5 +24,10 @@ import org.apache.samza.system.SystemStream;
public interface AvroRelSchemaProvider extends RelSchemaProvider {
+ /**
+ * Get payload schema corresponding to the system stream.
+ * @param systemStream system stream for which payload schema needs to be obtained.
+ * @return schema in the form of string
+ */
String getSchema(SystemStream systemStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
deleted file mode 100644
index 4b4b8f2..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.data;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-
-/**
- * A serializable class that holds different key parts.
- */
-public class SamzaSqlCompositeKey implements Serializable {
-
- @JsonProperty("keyParts")
- private ArrayList<Object> keyParts;
- private int hashCode;
-
- @JsonCreator
- public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) {
- this.keyParts = new ArrayList<>(keyParts);
- hashCode = keyParts.hashCode();
- }
-
- /**
- * Get the keyParts of all the columns in the relational message.
- * @return the keyParts of all the columns
- */
- @JsonProperty("keyParts")
- public ArrayList<Object> getKeyParts() {
- return keyParts;
- }
-
- @Override
- public String toString() {
- return String.join(", ", Arrays.toString(keyParts.toArray()));
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public boolean equals(Object o) {
- return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts);
- }
-
- /**
- * Create the SamzaSqlCompositeKey from the rel message.
- * @param message Represents the samza sql rel message.
- * @param relIdx list of keys in the form of field indices within the rel message.
- * @return the composite key of the rel message
- */
- public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
- ArrayList<Object> keyParts = new ArrayList<>();
- for (int idx : relIdx) {
- keyParts.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
- }
- return new SamzaSqlCompositeKey(keyParts);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 3ebbb23..55ce7b0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -31,12 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
/**
* Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
* Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list
- * of column values and the associated column names.
+ * of column values and the associated column names. Please note that the primary key itself could be a
+ * {@link SamzaSqlRelRecord}.
*/
public class SamzaSqlRelMessage implements Serializable {
public static final String KEY_NAME = "__key__";
+ // key could be a record in itself.
private final Object key;
@JsonProperty("samzaSqlRelRecord")
@@ -122,4 +124,52 @@ public class SamzaSqlRelMessage implements Serializable {
SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord);
}
+
+ @Override
+ public String toString() {
+ return "RelMessage: {" + samzaSqlRelRecord + "}";
+ }
+
+ /**
+ * Create composite key from the rel message.
+ * @param message Represents the samza sql rel message to extract the key values from.
+ * @param keyValueIdx list of key values in the form of field indices within the rel message.
+ * @param keyPartNames Represents the key field names.
+ * @return the composite key of the rel message
+ */
+ public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> keyValueIdx,
+ List<String> keyPartNames) {
+ Validate.isTrue(keyValueIdx.size() == keyPartNames.size(), "Key part name and value list sizes are different");
+ ArrayList<Object> keyPartValues = new ArrayList<>();
+ for (int idx : keyValueIdx) {
+ keyPartValues.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
+ }
+ return new SamzaSqlRelRecord(keyPartNames, keyPartValues);
+ }
+
+ /**
+ * Create composite key from the rel message.
+ * @param message Represents the samza sql rel message to extract the key values and names from.
+ * @param relIdx list of keys in the form of field indices within the rel message.
+ * @return the composite key of the rel message
+ */
+ public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
+ return createSamzaSqlCompositeKey(message, relIdx,
+ getSamzaSqlCompositeKeyFieldNames(message.getSamzaSqlRelRecord().getFieldNames(), relIdx));
+ }
+
+ /**
+ * Get composite key field names.
+ * @param fieldNames list of field names to extract the key names from.
+ * @param nameIds indices within the field names.
+ * @return list of composite key field names
+ */
+ public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldNames,
+ List<Integer> nameIds) {
+ List<String> keyPartNames = new ArrayList<>();
+ for (int idx : nameIds) {
+ keyPartNames.add(fieldNames.get(idx));
+ }
+ return keyPartNames;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
new file mode 100644
index 0000000..e31c2bb
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+/**
+ * UDF that converts an object to it's string representation.
+ */
+public class ConvertToStringUdf implements ScalarUdf<String> {
+ @Override
+ public void init(Config udfConfig) {
+ }
+
+ @Override
+ public String execute(Object... args) {
+ return args[0].toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
new file mode 100644
index 0000000..58b5c99
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+/**
+ * UDF that extracts a field value from a nested SamzaSqlRelRecord by recursively following a query path.
+ * Note that the root object must be a SamzaSqlRelRecord.
+ *
+ * Syntax for field specification:
+ * <ul>
+ * <li> SamzaSqlRelRecord/Map: <code> field.subfield </code> </li>
+ * <li> Array: <code> field[index] </code> </li>
+ * <li> Scalar types: <code> field </code> </li>
+ * </ul>
+ *
+ * Example query: <code> pageViewEvent.requestHeader.properties.cookies[3].sessionKey </code>
+ *
+ * Above query extracts the sessionKey field from below nested record:
+ *
+ * pageViewEvent (SamzaSqlRelRecord)
+ * - requestHeader (SamzaSqlRelRecord)
+ * - properties (Map)
+ * - cookies (Array)
+ * - sessionKey (Scalar)
+ *
+ */
+public class GetSqlFieldUdf implements ScalarUdf<String> {
+ @Override
+ public void init(Config udfConfig) {
+ }
+
+ @Override
+ public String execute(Object... args) {
+ Object currentFieldOrValue = args[0];
+ Validate.isTrue(currentFieldOrValue == null
+ || currentFieldOrValue instanceof SamzaSqlRelRecord);
+ if (currentFieldOrValue != null && args.length > 1) {
+ String[] fieldNameChain = ((String) args[1]).split("\\.");
+ for (int i = 0; i < fieldNameChain.length && currentFieldOrValue != null; i++) {
+ currentFieldOrValue = extractField(fieldNameChain[i], currentFieldOrValue);
+ }
+ }
+
+ if (currentFieldOrValue != null) {
+ return currentFieldOrValue.toString();
+ }
+
+ return null;
+ }
+
+ static Object extractField(String fieldName, Object current) {
+ if (current instanceof SamzaSqlRelRecord) {
+ SamzaSqlRelRecord record = (SamzaSqlRelRecord) current;
+ Validate.isTrue(record.getFieldNames().contains(fieldName),
+ String.format("Invalid field %s in %s", fieldName, record));
+ return record.getField(fieldName).orElse(null);
+ } else if (current instanceof Map) {
+ Map map = (Map) current;
+ Validate.isTrue(map.containsKey(fieldName), String.format("Invalid field %s in %s", fieldName, map));
+ return map.get(fieldName);
+ } else if (current instanceof List && fieldName.endsWith("]")) {
+ List list = (List) current;
+ int index = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1));
+ return list.get(index);
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "Unsupported accessing operation for data type: %s with field: %s.", current.getClass(), fieldName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 2514d30..80cb789 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,14 +21,15 @@ package org.apache.samza.sql.impl;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,9 +108,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
TableDescriptor tableDescriptor = null;
if (isTable) {
String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-");
- tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
- new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde))
+ .withChangelogEnabled();
}
return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index f36d990..d83ca7f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -150,7 +150,7 @@ public class QueryPlanner {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
List<RelDataTypeField> fieldsList = new ArrayList<>();
fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0,
- typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true)));
+ typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true)));
fieldsList.addAll(relationalSchema.getFieldList());
return new RelRecordType(fieldsList);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 0939f7b..5f44ff9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -19,6 +19,7 @@
package org.apache.samza.sql.translator;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -39,19 +40,19 @@ import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
/**
@@ -103,8 +104,10 @@ class JoinTranslator {
isTablePosOnRight ?
context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
- List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames();
- List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames();
+ List<String> streamFieldNames =
+ new ArrayList<>((isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames());
+ List<String> tableFieldNames =
+ new ArrayList<>((isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames());
Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
log.info("Joining on the following Stream and Table field(s): ");
for (int i = 0; i < streamKeyIds.size(); i++) {
@@ -113,23 +116,25 @@ class JoinTranslator {
SamzaSqlRelMessageJoinFunction joinFn =
new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
- tableFieldNames);
+ tableKeyIds, tableFieldNames);
- Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
(SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
// Always re-partition the messages from the input stream by the composite key and then join the messages
- // with the table.
+ // with the table. For the composite key, provide the corresponding table names in the key instead of using
+ // the names from the stream as the lookup needs to be done based on what is stored in the local table.
MessageStream<SamzaSqlRelMessage> outputStream =
inputStream
- .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
+ .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
+ getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)),
m -> m,
KVSerde.of(keySerde, valueSerde),
intermediateStreamPrefix + "stream_" + joinId)
.map(KV::getValue)
.join(table, joinFn);
- // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
context.registerMessageStream(join.getId(), outputStream);
}
@@ -299,13 +304,13 @@ class JoinTranslator {
// Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
// message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
- Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
+ Table<KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> table =
context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get());
- Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
(SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-
// Let's always repartition by the join fields as key before sending the key and value to the table.
// We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
// have the same partitioning scheme and partition key.
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
index 889ea97..d0a3d11 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -25,20 +25,21 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.lang.Validate;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
/**
* This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
- * being {@link SamzaSqlCompositeKey}
+ * being {@link SamzaSqlRelRecord}
*/
public class SamzaSqlRelMessageJoinFunction
- implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
+ implements StreamTableJoinFunction<SamzaSqlRelRecord, SamzaSqlRelMessage, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
@@ -46,17 +47,20 @@ public class SamzaSqlRelMessageJoinFunction
private final boolean isTablePosOnRight;
private final ArrayList<Integer> streamFieldIds;
// Table field names are used in the outer join when the table record is not found.
+ private final ArrayList<Integer> tableKeyIds;
private final ArrayList<String> tableFieldNames;
private final ArrayList<String> outFieldNames;
SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
- List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) {
+ List<Integer> streamFieldIds, List<String> streamFieldNames, List<Integer> tableKeyIds,
+ List<String> tableFieldNames) {
this.joinRelType = joinRelType;
this.isTablePosOnRight = isTablePosOnRight;
Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
(joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
joinRelType.compareTo(JoinRelType.INNER) == 0);
this.streamFieldIds = new ArrayList<>(streamFieldIds);
+ this.tableKeyIds = new ArrayList<>(tableKeyIds);
this.tableFieldNames = new ArrayList<>(tableFieldNames);
this.outFieldNames = new ArrayList<>();
if (isTablePosOnRight) {
@@ -69,7 +73,7 @@ public class SamzaSqlRelMessageJoinFunction
}
@Override
- public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
@@ -106,12 +110,13 @@ public class SamzaSqlRelMessageJoinFunction
}
@Override
- public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) {
- return createSamzaSqlCompositeKey(message, streamFieldIds);
+ public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) {
+ return createSamzaSqlCompositeKey(message, streamFieldIds,
+ getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds));
}
@Override
- public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+ public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) {
return record.getKey();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 708eb3e..40aa791 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -19,7 +19,6 @@
package org.apache.samza.sql.avro;
-import com.google.common.base.Joiner;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -160,8 +159,7 @@ public class TestAvroRelConversion {
record.put("name", "name1");
SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
- LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldValues()));
- LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
+ LOG.info(message.toString());
}
@Test
@@ -263,8 +261,7 @@ public class TestAvroRelConversion {
SamzaSqlRelMessage relMessage = nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
- LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldValues()));
- LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldNames()));
+ LOG.info(relMessage.toString());
KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(relMessage);
GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue();
@@ -321,8 +318,7 @@ public class TestAvroRelConversion {
Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(),
DEFAULT_TRACKING_ID_BYTES));
- LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues()));
- LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
+ LOG.info(message.toString());
KV<Object, Object> samzaMessage = complexRecordAvroRelConverter.convertToSamzaMessage(message);
GenericRecord record = (GenericRecord) samzaMessage.getValue();
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
index d0a2f59..a4f3afc 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
@@ -20,7 +20,9 @@
package org.apache.samza.sql.data;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.junit.Assert;
import org.junit.Test;
@@ -61,4 +63,32 @@ public class TestSamzaSqlRelMessage {
Assert.assertNotEquals(message1, message2);
Assert.assertNotEquals(message1.hashCode(), message2.hashCode());
}
+
+ @Test
+ public void testCompositeKeyCreation() {
+ List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
+ SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+
+ SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Collections.singletonList(0));
+ Assert.assertEquals(relRecord1.getFieldNames().size(), 1);
+ Assert.assertEquals(relRecord1.getFieldNames().get(0), "field1");
+ Assert.assertEquals(relRecord1.getFieldValues().get(0), "value1");
+
+ SamzaSqlRelRecord relRecord2 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0),
+ SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1, 0)));
+ Assert.assertEquals(relRecord2.getFieldNames().size(), 2);
+ Assert.assertEquals(relRecord2.getFieldNames().get(0), "kfield2");
+ Assert.assertEquals(relRecord2.getFieldValues().get(0), "value2");
+ Assert.assertEquals(relRecord2.getFieldNames().get(1), "kfield1");
+ Assert.assertEquals(relRecord2.getFieldValues().get(1), "value1");
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testCompositeKeyCreationWithInEqualKeyNameValues() {
+ List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
+ SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+
+ SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0),
+ SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1)));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
new file mode 100644
index 0000000..9b80549
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestConvertToStringUdf {
+
+ private enum LightSwitch {
+ On,
+ Off
+ }
+
+ @Test
+ public void testConvertIntegerToString() {
+ ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+ Assert.assertEquals(convertToStringUdf.execute(10), "10");
+ }
+
+ @Test
+ public void testConvertLongToString() {
+ ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+ Assert.assertEquals(convertToStringUdf.execute(10000000000L), "10000000000");
+ }
+
+ @Test
+ public void testConvertDoubleToString() {
+ ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+ Assert.assertEquals(convertToStringUdf.execute(10.0000345), "10.0000345");
+ }
+
+ @Test
+ public void testConvertBooleanToString() {
+ ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+ Assert.assertEquals(convertToStringUdf.execute(true), "true");
+ }
+
+ @Test
+ public void testConvertEnumToString() {
+ ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf();
+ Assert.assertEquals(convertToStringUdf.execute(LightSwitch.On), "On");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
new file mode 100644
index 0000000..b084fd1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java
@@ -0,0 +1,166 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestGetSqlFieldUdf {
+ static Object createRecord(List<String> fieldNames, int level) {
+ String fieldName = fieldNames.get(level);
+ Object child = (level == fieldNames.size() - 1) ? "bar" : createRecord(fieldNames, level + 1);
+ boolean isMap = false;
+ int arrayIndex = -1;
+ if (fieldName.startsWith("map:")) {
+ isMap = true;
+ fieldName = fieldName.substring(4); // strip "map:"
+ } else if (fieldName.endsWith("]")) {
+ arrayIndex = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1));
+ fieldName = fieldName.substring(0, fieldName.indexOf("["));
+ }
+
+ if (isMap) {
+ Map<String, Object> retMap = new HashMap<>();
+ retMap.put(fieldName, child);
+ return retMap;
+ } else if (arrayIndex >= 0) {
+ List list = Arrays.asList(new Object[2 * arrayIndex]);
+ list.set(arrayIndex, child);
+ return list;
+ } else {
+ return new SamzaSqlRelRecord(Collections.singletonList(fieldName), Collections.singletonList(child));
+ }
+ }
+
+ private SamzaSqlRelRecord createRecord(String path) {
+ return (SamzaSqlRelRecord) createRecord(Arrays.asList(path.split("\\.")), 0);
+ }
+
+ @Test
+ public void testSingleLevel() {
+ SamzaSqlRelRecord record = createRecord("foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "foo"), "bar");
+ }
+
+ @Test
+ public void testMultiLevel() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+ }
+
+ @Test
+ public void testNullRecord() {
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(null, "bar.baz.baf.foo"), null);
+ }
+
+ @Test (expected = NullPointerException.class)
+ public void testNullFields() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ getSqlFieldUdf.execute(record, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSingleLevelInvalidField() {
+ SamzaSqlRelRecord record = createRecord("foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ getSqlFieldUdf.execute(record, "bar");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMultiLevelInvalidIntermediateField() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ getSqlFieldUdf.execute(record, "bar.baz.bacon");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMultiLevelInvalidFinalField() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ getSqlFieldUdf.execute(record, "bar.baz.baf.funny");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPathTooDeep() {
+ SamzaSqlRelRecord record = createRecord("bar.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ getSqlFieldUdf.execute(record, "bar.baz.baf.funny");
+ }
+
+ @Test
+ public void testMapAtLastField() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.map:foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+ }
+
+ @Test
+ public void testMapAtIntermediateFields() {
+ SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+ }
+
+ @Test
+ public void testMapAtAllIntermediateFields() {
+ SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.map:foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar");
+ }
+
+ @Test
+ public void testArrayAtLastField() {
+ SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo[3]");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3]"), "bar");
+ }
+
+ @Test
+ public void testArrayAtIntermediateFields() {
+ SamzaSqlRelRecord record = createRecord("bar.baz[3].baf[2].foo");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[3].baf[2].foo"), "bar");
+ }
+
+ @Test
+ public void testArrayAtAllIntermediateFields() {
+ SamzaSqlRelRecord record = createRecord("bar.baz[2].baf[3].foo[5]");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[2].baf[3].foo[5]"), "bar");
+ }
+
+ @Test
+ public void testAllFieldTypes() {
+ SamzaSqlRelRecord record = createRecord("bar.map:baz.baf.foo[3].fun");
+ GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf();
+ Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3].fun"), "bar");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index fd811cd..41c809e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -188,7 +188,7 @@ public class TestAvroSystemFactory implements SystemFactory {
// We send num Messages and an end of stream message following that.
List<IncomingMessageEnvelope> envelopes =
IntStream.range(curMessages, curMessages + numMessages/4)
- .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
+ .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, getKey(i, ssp),
getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
.collect(Collectors.toList());
envelopeMap.put(ssp, envelopes);
@@ -201,6 +201,10 @@ public class TestAvroSystemFactory implements SystemFactory {
return envelopeMap;
}
+ private Object getKey(int index, SystemStreamPartition ssp) {
+ return "key" + index;
+ }
+
private Object getData(int index, SystemStreamPartition ssp) {
if (simpleRecordMap.contains(ssp)) {
return createSimpleRecord(index);
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index e54223c..76ebfc2 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -42,7 +42,6 @@ import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 818e33d..5e7dd4c 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -26,16 +26,15 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
@@ -207,9 +206,11 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
} else {
String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
- tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
- new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
}
tableDescMap.put(ioName, tableDescriptor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
index 5dd2d21..6362155 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.samza.operators.KV;
-import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.junit.Assert;
import org.junit.Test;
@@ -45,11 +45,11 @@ public class TestSamzaSqlRelMessageJoinFunction {
JoinRelType joinRelType = JoinRelType.INNER;
List<Integer> streamKeyIds = Arrays.asList(0, 1);
List<Integer> tableKeyIds = Arrays.asList(0, 1);
- SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
- KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+ SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+ new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
@@ -68,11 +68,12 @@ public class TestSamzaSqlRelMessageJoinFunction {
JoinRelType joinRelType = JoinRelType.INNER;
List<Integer> streamKeyIds = Arrays.asList(0, 2);
List<Integer> tableKeyIds = Arrays.asList(0, 2);
- SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
- KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+ SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+ new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames,
+ tableKeyIds, tableFieldNames);
SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
@@ -89,9 +90,10 @@ public class TestSamzaSqlRelMessageJoinFunction {
SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
JoinRelType joinRelType = JoinRelType.INNER;
List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(2, 3);
SamzaSqlRelMessageJoinFunction joinFn =
- new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+ new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames);
SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
Assert.assertNull(outMsg);
}
@@ -101,10 +103,11 @@ public class TestSamzaSqlRelMessageJoinFunction {
SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
JoinRelType joinRelType = JoinRelType.LEFT;
List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(2, 3);
SamzaSqlRelMessageJoinFunction joinFn =
new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
- tableFieldNames);
+ tableKeyIds, tableFieldNames);
SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
index 6c7a8c2..1b1edbc 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.avro.AvroRelConverter;
import org.apache.samza.sql.avro.AvroRelSchemaProvider;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
@@ -52,14 +53,15 @@ public class AvroSchemaGenRelConverter extends AvroRelConverter {
@Override
public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
- Schema schema = computeSchema(streamName, relMessage);
- return convertToSamzaMessage(relMessage, schema);
+ Schema payloadSchema = computePayloadSchema(streamName, relMessage);
+ return convertToSamzaMessage(relMessage, payloadSchema);
}
- private Schema computeSchema(String streamName, SamzaSqlRelMessage relMessage) {
+ private Schema computePayloadSchema(String streamName, SamzaSqlRelMessage relMessage) {
+ SamzaSqlRelRecord relRecord = relMessage.getSamzaSqlRelRecord();
List<Schema.Field> keyFields = new ArrayList<>();
- List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames();
- List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+ List<String> fieldNames = relRecord.getFieldNames();
+ List<Object> values = relRecord.getFieldValues();
for (int index = 0; index < fieldNames.size(); index++) {
if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || values.get(index) == null) {