You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/02/20 21:23:42 UTC
[samza] branch master updated: Records with all null values are
transformed to null records (#926)
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 30b43d9 Records with all null values are transformed to null records (#926)
30b43d9 is described below
commit 30b43d960aeed9a12dd04b4cd1f5b67c5858ba2d
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Wed Feb 20 13:23:38 2019 -0800
Records with all null values are transformed to null records (#926)
* Records with all null values are transformed to null records
* Fix for the tests:
---
.../main/java/org/apache/samza/sql/avro/AvroRelConverter.java | 10 +---------
.../java/org/apache/samza/sql/avro/TestAvroRelConversion.java | 3 +++
.../org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java | 3 ++-
.../apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java | 5 +++++
4 files changed, 11 insertions(+), 10 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index ba79e48..d34b94d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -153,7 +153,6 @@ public class AvroRelConverter implements SamzaRelConverter {
GenericRecord record = new GenericData.Record(schema);
List<String> fieldNames = relRecord.getFieldNames();
List<Object> values = relRecord.getFieldValues();
- int nullFieldValueCount = 0;
for (int index = 0; index < fieldNames.size(); index++) {
if (!fieldNames.get(index).equalsIgnoreCase(SamzaSqlRelMessage.KEY_NAME)) {
String fieldName = fieldNames.get(index);
@@ -167,26 +166,19 @@ public class AvroRelConverter implements SamzaRelConverter {
* We ignore the fields which doesn't have corresponding schema in the output topic.
*/
if (schema.getField(fieldName) == null) {
- nullFieldValueCount++;
LOG.debug("Schema with Name {} and Namespace {} doesn't contain the fieldName {}, Skipping it.",
schema.getName(), schema.getNamespace(), fieldName);
continue;
}
Object relObj = values.get(index);
- if (relObj == null) {
- nullFieldValueCount++;
- }
Schema fieldSchema = schema.getField(fieldName).schema();
record.put(fieldName, convertToAvroObject(relObj, getNonNullUnionSchema(fieldSchema)));
- } else {
- // Count key towards null field values for accounting purposes.
- nullFieldValueCount++;
}
}
// If all field values in the record are null, return a null record.
- return (nullFieldValueCount == fieldNames.size()) ? null : record;
+ return record;
}
static public Object convertToAvroObject(Object relObj, Schema schema) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index c3f1caf..4f33be1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -62,6 +62,7 @@ import org.apache.samza.sql.planner.RelSchemaConverter;
import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.system.SystemStream;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -280,6 +281,8 @@ public class TestAvroRelConversion {
}
}
+ // SAMZA-2110 We need to enable this when we have a true support for Null records
+ @Ignore
@Test
public void testRecordConversionWithNullPayload() throws IOException {
GenericData.Record record = null;
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 35c83e9..6ddb68b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -123,7 +123,8 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
runApplication(new MapConfig(staticConfigs));
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
- .map(x -> x.getMessage() == null ? null : Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+ .map(x -> x.getMessage() == null || ((GenericRecord) x.getMessage()).get("id") == null ? null
+ : Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
.filter(Objects::nonNull)
.sorted()
.collect(Collectors.toList());
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index 2c9556f..6f94d9f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -32,6 +32,7 @@ import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
@@ -52,6 +53,8 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size());
}
+ // SAMZA-2110 We need to enable this when we have a true support for Null records
+ @Ignore
@Test
public void testSinkEndToEndWithKeyWithNullRecords() {
int numMessages = 20;
@@ -212,6 +215,8 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
Assert.assertEquals(expectedOutMessages, outMessages);
}
+ // SAMZA-2110 We need to enable this when we have a true support for null records.
+ @Ignore
@Test
public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
int numMessages = 21;