You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/20 21:23:42 UTC

[samza] branch master updated: Records with all null values are transformed to null records (#926)

This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 30b43d9  Records with all null values are transformed to null records (#926)
30b43d9 is described below

commit 30b43d960aeed9a12dd04b4cd1f5b67c5858ba2d
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Wed Feb 20 13:23:38 2019 -0800

    Records with all null values are transformed to null records (#926)
    
    * Records with all null values are transformed to null records
    
    * Fix for the tests:
---
 .../main/java/org/apache/samza/sql/avro/AvroRelConverter.java  | 10 +---------
 .../java/org/apache/samza/sql/avro/TestAvroRelConversion.java  |  3 +++
 .../org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java   |  3 ++-
 .../apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java    |  5 +++++
 4 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index ba79e48..d34b94d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -153,7 +153,6 @@ public class AvroRelConverter implements SamzaRelConverter {
     GenericRecord record = new GenericData.Record(schema);
     List<String> fieldNames = relRecord.getFieldNames();
     List<Object> values = relRecord.getFieldValues();
-    int nullFieldValueCount = 0;
     for (int index = 0; index < fieldNames.size(); index++) {
       if (!fieldNames.get(index).equalsIgnoreCase(SamzaSqlRelMessage.KEY_NAME)) {
         String fieldName = fieldNames.get(index);
@@ -167,26 +166,19 @@ public class AvroRelConverter implements SamzaRelConverter {
          * We ignore the fields which doesn't have corresponding schema in the output topic.
          */
         if (schema.getField(fieldName) == null) {
-          nullFieldValueCount++;
           LOG.debug("Schema with Name {} and Namespace {} doesn't contain the fieldName {}, Skipping it.",
               schema.getName(), schema.getNamespace(), fieldName);
           continue;
         }
 
         Object relObj = values.get(index);
-        if (relObj == null) {
-          nullFieldValueCount++;
-        }
         Schema fieldSchema = schema.getField(fieldName).schema();
         record.put(fieldName, convertToAvroObject(relObj, getNonNullUnionSchema(fieldSchema)));
-      } else {
-        // Count key towards null field values for accounting purposes.
-        nullFieldValueCount++;
       }
     }
 
     // If all field values in the record are null, return a null record.
-    return (nullFieldValueCount == fieldNames.size()) ? null : record;
+    return record;
   }
 
   static public Object convertToAvroObject(Object relObj, Schema schema) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index c3f1caf..4f33be1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -62,6 +62,7 @@ import org.apache.samza.sql.planner.RelSchemaConverter;
 import org.apache.samza.sql.schema.SqlSchema;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -280,6 +281,8 @@ public class TestAvroRelConversion {
     }
   }
 
+  // SAMZA-2110 We need to enable this when we have a true support for Null records
+  @Ignore
   @Test
   public void testRecordConversionWithNullPayload() throws IOException {
     GenericData.Record record = null;
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 35c83e9..6ddb68b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -123,7 +123,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     runApplication(new MapConfig(staticConfigs));
 
     List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
-        .map(x -> x.getMessage() == null ? null : Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .map(x -> x.getMessage() == null || ((GenericRecord) x.getMessage()).get("id") == null ? null
+            : Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
         .filter(Objects::nonNull)
         .sorted()
         .collect(Collectors.toList());
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index 2c9556f..6f94d9f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -32,6 +32,7 @@ import org.apache.samza.sql.util.JsonUtil;
 import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 
@@ -52,6 +53,8 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
   }
 
+  // SAMZA-2110 We need to enable this when we have a true support for Null records
+  @Ignore
   @Test
   public void testSinkEndToEndWithKeyWithNullRecords() {
     int numMessages = 20;
@@ -212,6 +215,8 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(expectedOutMessages, outMessages);
   }
 
+  // SAMZA-2110 We need to enable this when we have a true support for null records.
+  @Ignore
   @Test
   public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
     int numMessages = 21;