You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2020/08/05 17:55:25 UTC

[samza] branch master updated: SAMZA-2575: Fix the flatten output values and add more tests (#1409)

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c4834e  SAMZA-2575: Fix the flatten output values and add more tests (#1409)
5c4834e is described below

commit 5c4834e6b6952a8d66329a0671a002fbd2a38aa0
Author: Slim Bouguerra <b-...@users.noreply.github.com>
AuthorDate: Wed Aug 5 10:55:15 2020 -0700

    SAMZA-2575: Fix the flatten output values and add more tests (#1409)
---
 .../samza/sql/translator/ProjectTranslator.java    | 16 ++--
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 94 +++++++++++++++++-----
 2 files changed, 82 insertions(+), 28 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 16a320e..79f58e8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -269,26 +268,27 @@ public class ProjectTranslator {
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
       MessageStream<SamzaSqlRelMessage> inputStream) {
     return inputStream.flatMap(message -> {
-      Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
-      if (field != null && field instanceof List) {
-        List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+      Object targetFlattenColumn = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
+      final List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+      if (targetFlattenColumn != null && targetFlattenColumn instanceof List) {
+        List<Object> objectList = (List<Object>) targetFlattenColumn;
         SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata();
         SamzaSqlRelMsgMetadata newMetadata =
             new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), messageMetadata.getArrivalTime(),
                 messageMetadata.getScanTimeNanos(), messageMetadata.getScanTimeMillis());
-        for (Object fieldValue : (List) field) {
+        for (Object fieldValue : objectList) {
           List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
-          newValues.set(flattenIndex, Collections.singletonList(fieldValue));
+          newValues.set(flattenIndex, fieldValue);
           outMessages.add(
               new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, newMetadata));
           newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getArrivalTime(),
               newMetadata.getScanTimeNanos(), newMetadata.getScanTimeMillis());
         }
-        return outMessages;
       } else {
         message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true;
-        return Collections.singletonList(message);
+        outMessages.add(message);
       }
+      return outMessages;
     });
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index ca78af2..4a515c0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -27,11 +27,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.planner.SamzaSqlValidator;
@@ -422,15 +422,14 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndFlatten() throws Exception {
+  public void testEndToEndFlatten() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
-    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
-            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
+        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0, array_values) "
+            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0, array_values"
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -442,12 +441,29 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
+    //Assert.assertEquals(outMessages.size(), actualList.size());
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("string_value") == null)
+        .findFirst();
+    // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
   }
 
 
@@ -475,7 +491,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndWithFloatToStringConversion() throws Exception {
+  public void testEndToEndWithFloatToStringConversion() {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -609,16 +625,34 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
     // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("id") == null)
+        .findFirst();
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+    //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
+   /* // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
+    */
   }
 
   @Test
-  public void testEndToEndSubQuery() throws Exception {
+  public void testEndToEndSubQuery() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -635,12 +669,32 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
     // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("id") == null)
+        .findFirst();
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+
+    //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
+   /* // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
+    */
+
   }
 
   @Test