You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/13 22:26:42 UTC

samza git commit: SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords.

Repository: samza
Updated Branches:
  refs/heads/master 93b397e84 -> 7055ce670


SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srini P<sp...@linkedin.com>

Closes #558 from atoomula/sql1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7055ce67
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7055ce67
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7055ce67

Branch: refs/heads/master
Commit: 7055ce670a270a14c1833ee0bd05fbd74cb93911
Parents: 93b397e
Author: Aditya Toomula <at...@linkedin.com>
Authored: Fri Jul 13 15:26:34 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Jul 13 15:26:34 2018 -0700

----------------------------------------------------------------------
 .../samza/tools/json/JsonRelConverterFactory.java     | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7055ce67/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index ce257f1..4db066a 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -55,12 +56,16 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
 
     @Override
     public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+      String jsonValue = convertToSamzaMessage(relMessage.getSamzaSqlRelRecord());
+      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+    }
 
+    private String convertToSamzaMessage(SamzaSqlRelRecord relRecord) {
       String jsonValue;
       ObjectNode node = mapper.createObjectNode();
 
-      List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames();
-      List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+      List<String> fieldNames = relRecord.getFieldNames();
+      List<Object> values = relRecord.getFieldValues();
 
       for (int index = 0; index < fieldNames.size(); index++) {
         Object value = values.get(index);
@@ -77,6 +82,9 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
           node.put(fieldNames.get(index), (Double) value);
         } else if (String.class.isAssignableFrom(value.getClass())) {
           node.put(fieldNames.get(index), (String) value);
+        } else if (SamzaSqlRelRecord.class.isAssignableFrom(value.getClass())) {
+          // If the value is a SamzaSqlRelRecord, call convertToSamzaMessage to convert the record to json string.
+          node.put(fieldNames.get(index), convertToSamzaMessage((SamzaSqlRelRecord) value));
         } else {
           node.put(fieldNames.get(index), value.toString());
         }
@@ -87,7 +95,7 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
         throw new SamzaException("Error json serializing object", e);
       }
 
-      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+      return jsonValue;
     }
   }
 }