You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/13 22:26:42 UTC
samza git commit: SAMZA-1755: Fix JsonRelConverter to recursively
convert relRecords.
Repository: samza
Updated Branches:
refs/heads/master 93b397e84 -> 7055ce670
SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srini P<sp...@linkedin.com>
Closes #558 from atoomula/sql1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7055ce67
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7055ce67
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7055ce67
Branch: refs/heads/master
Commit: 7055ce670a270a14c1833ee0bd05fbd74cb93911
Parents: 93b397e
Author: Aditya Toomula <at...@linkedin.com>
Authored: Fri Jul 13 15:26:34 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Jul 13 15:26:34 2018 -0700
----------------------------------------------------------------------
.../samza/tools/json/JsonRelConverterFactory.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7055ce67/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index ce257f1..4db066a 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -55,12 +56,16 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
@Override
public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+ String jsonValue = convertToSamzaMessage(relMessage.getSamzaSqlRelRecord());
+ return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+ }
+ private String convertToSamzaMessage(SamzaSqlRelRecord relRecord) {
String jsonValue;
ObjectNode node = mapper.createObjectNode();
- List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames();
- List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+ List<String> fieldNames = relRecord.getFieldNames();
+ List<Object> values = relRecord.getFieldValues();
for (int index = 0; index < fieldNames.size(); index++) {
Object value = values.get(index);
@@ -77,6 +82,9 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
node.put(fieldNames.get(index), (Double) value);
} else if (String.class.isAssignableFrom(value.getClass())) {
node.put(fieldNames.get(index), (String) value);
+ } else if (SamzaSqlRelRecord.class.isAssignableFrom(value.getClass())) {
+ // If the value is a SamzaSqlRelRecord, call convertToSamzaMessage to convert the record to json string.
+ node.put(fieldNames.get(index), convertToSamzaMessage((SamzaSqlRelRecord) value));
} else {
node.put(fieldNames.get(index), value.toString());
}
@@ -87,7 +95,7 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
throw new SamzaException("Error json serializing object", e);
}
- return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+ return jsonValue;
}
}
}