You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/02/15 23:46:27 UTC

samza git commit: Support source types where the last part of the source is not the streamName

Repository: samza
Updated Branches:
  refs/heads/master 4123b160d -> c3e6b5517


Support source types where the last part of the source is not the streamName

Contains following fixes

1. Right now Samza SQL framework assumes that the last part of the source is the stream Name, removed the assumption
2. Made consoleLoggingSystemFactory to log formatted json so that it's easily readable.
3. Added support in SamzaSqlRelMessage where the key may not be present.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #421 from srinipunuru/four-part.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3e6b551
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3e6b551
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3e6b551

Branch: refs/heads/master
Commit: c3e6b5517bdf505b9943b763a8aba35b714019af
Parents: 4123b16
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Thu Feb 15 15:46:17 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Thu Feb 15 15:46:17 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java |  5 +-
 .../samza/sql/data/SamzaSqlRelMessage.java      | 93 +++++++++-----------
 .../apache/samza/sql/planner/QueryPlanner.java  | 11 ++-
 .../samza/sql/translator/FilterTranslator.java  |  2 +-
 .../samza/sql/translator/ProjectTranslator.java |  8 +-
 .../samza/sql/TestSamzaSqlRelMessage.java       |  4 +-
 .../samza/sql/avro/TestAvroRelConversion.java   |  5 +-
 .../tools/ConsoleLoggingSystemFactory.java      | 27 +++++-
 8 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 086bb93..e247415 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -134,7 +134,6 @@ public class AvroRelConverter implements SamzaRelConverter {
   private final Schema mapSchema = Schema.parse(
       "{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
 
-
   public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
     this.config = config;
     this.relationalSchema = schemaProvider.getRelationalSchema();
@@ -175,7 +174,9 @@ public class AvroRelConverter implements SamzaRelConverter {
     List<String> fieldNames = relMessage.getFieldNames();
     List<Object> values = relMessage.getFieldValues();
     for (int index = 0; index < fieldNames.size(); index++) {
-      record.put(fieldNames.get(index), values.get(index));
+      if (!fieldNames.get(index).equalsIgnoreCase(SamzaSqlRelMessage.KEY_NAME)) {
+        record.put(fieldNames.get(index), values.get(index));
+      }
     }
 
     return new KV<>(relMessage.getKey(), record);

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index bf945a0..452a32c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.data;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import org.apache.commons.lang.Validate;
 
@@ -37,29 +36,49 @@ public class SamzaSqlRelMessage {
 
   public static final String KEY_NAME = "__key__";
 
-  private final List<Object> value = new ArrayList<>();
-  private final List<Object> relFieldValues = new ArrayList<>();
-  private final List<String> names = new ArrayList<>();
+  private final List<Object> fieldValues = new ArrayList<>();
+  private final List<String> fieldNames = new ArrayList<>();
   private final Object key;
 
   /**
+   * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
+   * If the field list contains KEY, then it extracts the key out of the fields to creates a
+   * RelMessage with key and values otherwise creates a Relmessage without the key.
+   * @param fieldNames Ordered list of field names in the row.
+   * @param fieldValues  Ordered list of all the values in the row. Some of the fields can be null, This could be result of
+   *               delete change capture event in the stream or because of the result of the outer join or the fields
+   *               themselves are null in the original stream.
+   */
+  public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
+    Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+
+    int keyIndex = fieldNames.indexOf(KEY_NAME);
+    Object key = null;
+    if (keyIndex != -1) {
+      key = fieldValues.get(keyIndex);
+    }
+
+    this.key = key;
+    this.fieldNames.addAll(fieldNames);
+    this.fieldValues.addAll(fieldValues);
+  }
+
+  /**
    * Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
    * So it can contain a key and a list of fields in the row.
-   * @param key Represents the key in the row, Key is optional, in which case it can be null.
-   * @param names Ordered list of field names in the row.
-   * @param values Ordered list of all the values in the row. Since the samzaSqlRelMessage can represent
-   *               the row in a change capture event stream, It can contain delete messages in which case
-   *               all the fields in the row can be null.
+   * @param key Represents the key in the row, Key can be null.
+   * @param fieldNames Ordered list of field names in the row.
+   * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be result of
+   *               delete change capture event in the stream or because of the result of the outer join or the fields
+   *               themselves are null in the original stream.
    */
-  public SamzaSqlRelMessage(Object key, List<String> names, List<Object> values) {
-    Validate.isTrue(names.size() == values.size(), "Field Names and values are not of same length.");
+  public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) {
+    Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
     this.key = key;
-    this.value.addAll(values);
-    this.names.addAll(names);
-    if (key != null) {
-      this.relFieldValues.add(key);
-    }
-    this.relFieldValues.addAll(values);
+    this.fieldNames.add(KEY_NAME);
+    this.fieldNames.addAll(fieldNames);
+    this.fieldValues.add(key);
+    this.fieldValues.addAll(fieldValues);
   }
 
   /**
@@ -67,19 +86,11 @@ public class SamzaSqlRelMessage {
    * @return the field names of all columns.
    */
   public List<String> getFieldNames() {
-    return names;
+    return fieldNames;
   }
 
-  /**
-   * Get the values of all the columns in the relational message.
-   * @return the values of all the columns
-   */
   public List<Object> getFieldValues() {
-    return value;
-  }
-
-  public List<Object> getRelFieldValues() {
-    return this.relFieldValues;
+    return this.fieldValues;
   }
 
   public Object getKey() {
@@ -92,34 +103,12 @@ public class SamzaSqlRelMessage {
    * @return returns the value of the field.
    */
   public Optional<Object> getField(String name) {
-    for (int index = 0; index < names.size(); index++) {
-      if (names.get(index).equals(name)) {
-        return Optional.ofNullable(value.get(index));
+    for (int index = 0; index < fieldNames.size(); index++) {
+      if (fieldNames.get(index).equals(name)) {
+        return Optional.ofNullable(fieldValues.get(index));
       }
     }
 
     return Optional.empty();
   }
-
-  /**
-   * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
-   * If the field list contains KEY, then it extracts the key out of the fields to create the
-   * RelMessage with key and values.
-   * @param fieldValues Field values that can include the key as well.
-   * @param fieldNames Field names in the rel message that can include the special __key__
-   * @return Created SamzaSqlRelMessage.
-   */
-  public static SamzaSqlRelMessage createRelMessage(List<Object> fieldValues, List<String> fieldNames) {
-    int keyIndex = fieldNames.indexOf(KEY_NAME);
-    fieldNames = new ArrayList<>(fieldNames);
-    fieldValues = new ArrayList<>(fieldValues);
-    Object key = null;
-    if (keyIndex != -1) {
-      key = fieldValues.get(keyIndex);
-      fieldValues.remove(keyIndex);
-      fieldNames.remove(keyIndex);
-    }
-
-    return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 061c03f..2b67f18 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -57,7 +57,6 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
-import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +75,6 @@ public class QueryPlanner {
   // Mapping between the source to the SqlSystemStreamConfig corresponding to the source.
   private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
 
-
   public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
       Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
     this.relSchemaProviders = relSchemaProviders;
@@ -95,17 +93,18 @@ public class QueryPlanner {
         List<String> sourceParts = ssc.getSourceParts();
         RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());
 
-        for (String sourcePart : sourceParts) {
-          if (!sourcePart.equalsIgnoreCase(ssc.getStreamName())) {
+        for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
+          String sourcePart = sourceParts.get(sourcePartIndex);
+          if (sourcePartIndex < sourceParts.size() - 1) {
             SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart);
             if (sourcePartSchema == null) {
               sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
             }
             previousLevelSchema = sourcePartSchema;
           } else {
-            // If the source part is the streamName, then fetch the schema corresponding to the stream and register.
+            // If the source part is the last one, then fetch the schema corresponding to the stream and register.
             RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
-            previousLevelSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+            previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
             break;
           }
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index e4bfcae..686ac15 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -45,7 +45,7 @@ public class FilterTranslator {
 
     MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(message -> {
       Object[] result = new Object[1];
-      expr.execute(context.getExecutionContext(), context.getDataContext(), message.getRelFieldValues().toArray(), result);
+      expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result);
       if (result.length > 0 && result[0] instanceof Boolean) {
         boolean retVal = (Boolean) result[0];
         log.debug(

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index c0387ad..f5cc525 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -66,13 +66,13 @@ public class ProjectTranslator {
     MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> {
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
-      expr.execute(context.getExecutionContext(), context.getDataContext(), m.getRelFieldValues().toArray(), output);
+      expr.execute(context.getExecutionContext(), context.getDataContext(), m.getFieldValues().toArray(), output);
       List<String> names = new ArrayList<>();
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
       }
 
-      return SamzaSqlRelMessage.createRelMessage(Arrays.asList(output), names);
+      return new SamzaSqlRelMessage(names, Arrays.asList(output));
     });
 
     context.registerMessageStream(project.getId(), outputStream);
@@ -81,14 +81,14 @@ public class ProjectTranslator {
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
       MessageStream<SamzaSqlRelMessage> inputStream) {
     return inputStream.flatMap(message -> {
-      Object field = message.getRelFieldValues().get(flattenIndex);
+      Object field = message.getFieldValues().get(flattenIndex);
 
       if (field != null && field instanceof List) {
         List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
         for (Object fieldValue : (List) field) {
           List<Object> newValues = new ArrayList<>(message.getFieldValues());
           newValues.set(flattenIndex, Collections.singletonList(fieldValue));
-          outMessages.add(new SamzaSqlRelMessage(message.getKey(), message.getFieldNames(), newValues));
+          outMessages.add(new SamzaSqlRelMessage(message.getFieldNames(), newValues));
         }
         return outMessages;
       } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
index 3290b96..e58563c 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
@@ -33,14 +33,14 @@ public class TestSamzaSqlRelMessage {
 
   @Test
   public void testGetField() {
-    SamzaSqlRelMessage message = SamzaSqlRelMessage.createRelMessage(values, names);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
     Assert.assertEquals(values.get(0), message.getField(names.get(0)).get());
     Assert.assertEquals(values.get(1), message.getField(names.get(1)).get());
   }
 
   @Test
   public void testGetNonExistentField() {
-    SamzaSqlRelMessage message = SamzaSqlRelMessage.createRelMessage(values, names);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
     Assert.assertFalse(message.getField("field3").isPresent());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 44b4213..21c666b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -144,7 +143,6 @@ public class TestAvroRelConversion {
     Assert.assertEquals(message.getFieldNames().size(), message.getFieldValues().size());
   }
 
-
   @Test
   public void testNullRecordConversion() {
     SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", null));
@@ -218,8 +216,7 @@ public class TestAvroRelConversion {
     RelDataType dataType = complexRecordSchemProvider.getRelationalSchema();
 
     SamzaSqlRelMessage message = complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", complexRecordValue));
-    Assert.assertEquals(message.getFieldNames().size(),
-        ComplexRecord.SCHEMA$.getFields().size());
+    Assert.assertEquals(message.getFieldNames().size(), ComplexRecord.SCHEMA$.getFields().size() + 1);
 
     Assert.assertEquals(message.getField("id").get(), id);
     Assert.assertEquals(message.getField("bool_value").get(), boolValue);

http://git-wip-us.apache.org/repos/asf/samza/blob/c3e6b551/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
index 87abc76..b271ccd 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
@@ -19,9 +19,11 @@
 
 package org.apache.samza.tools;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.NotImplementedException;
@@ -35,6 +37,7 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +50,9 @@ public class ConsoleLoggingSystemFactory implements SystemFactory {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConsoleLoggingSystemFactory.class);
 
+  public static AtomicInteger messageCounter = new AtomicInteger();
+  private static ObjectMapper mapper = new ObjectMapper();
+
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
     throw new NotImplementedException();
@@ -82,12 +88,27 @@ public class ConsoleLoggingSystemFactory implements SystemFactory {
           new String((byte[]) envelope.getMessage()));
       LOG.info(msg);
 
+      System.out.println(String.format("Message %d :", messageCounter.incrementAndGet()));
       if (envelope.getKey() != null) {
-        System.out.println(String.format("Key:%s Value:%s", envelope.getKey(),
-            new String((byte[]) envelope.getMessage())));
+        System.out.println(String.format("Key:%s Value:%s", envelope.getKey(), getFormattedValue(envelope)));
       } else {
-        System.out.println(new String((byte[]) envelope.getMessage()));
+        System.out.println(getFormattedValue(envelope));
+      }
+    }
+
+    private String getFormattedValue(OutgoingMessageEnvelope envelope) {
+      String value = new String((byte[]) envelope.getMessage());
+      String formattedValue;
+
+      try {
+        Object json = mapper.readValue(value, Object.class);
+        formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+      } catch (IOException e) {
+        formattedValue = value;
+        LOG.error("Error while formatting json", e);
       }
+
+      return formattedValue;
     }
 
     @Override