You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/03/25 17:54:09 UTC
[samza] branch master updated: Moving the Fixes from internal
AvroRelConverter (#966)
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f0fc481 Moving the Fixes from internal AvroRelConverter (#966)
f0fc481 is described below
commit f0fc4818a8241b3dd6c97a5654c2c1732ae9c380
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Mon Mar 25 10:54:05 2019 -0700
Moving the Fixes from internal AvroRelConverter (#966)
* Fixes to AvroRelConverter
* review fixes
---
.../apache/samza/sql/avro/AvroRelConverter.java | 144 +++++++++++----------
1 file changed, 78 insertions(+), 66 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index d34b94d..fcb0c72 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -1,26 +1,25 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.avro;
import java.nio.ByteBuffer;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,6 +31,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.calcite.avatica.util.ByteString;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
@@ -69,7 +69,8 @@ public class AvroRelConverter implements SamzaRelConverter {
public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
this.config = config;
- this.payloadSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+ String schema = schemaProvider.getSchema(systemStream);
+ this.payloadSchema = schema == null ? null : Schema.parse(schema);
}
/**
@@ -85,7 +86,7 @@ public class AvroRelConverter implements SamzaRelConverter {
fetchFieldNamesAndValuesFromIndexedRecord((IndexedRecord) value, payloadFieldNames, payloadFieldValues,
payloadSchema);
} else if (value == null) {
- // If the payload is null, set each record value as null.
+ // If the payload is null, set each record value as null
payloadFieldNames.addAll(payloadSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
IntStream.range(0, payloadFieldNames.size()).forEach(x -> payloadFieldValues.add(null));
} else {
@@ -98,32 +99,41 @@ public class AvroRelConverter implements SamzaRelConverter {
new SamzaSqlRelMsgMetadata("", "", ""));
}
- public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
+ /**
+ * Create a SamzaSqlRelMessage for the specified key and Avro record using the schema from the Avro record.
+ *
+ */
+ public static SamzaSqlRelMessage convertToRelMessage(Object key, IndexedRecord record, Schema schema) {
+ List<String> payloadFieldNames = new ArrayList<>();
+ List<Object> payloadFieldValues = new ArrayList<>();
+ fetchFieldNamesAndValuesFromIndexedRecord(record, payloadFieldNames, payloadFieldValues, schema);
+ return new SamzaSqlRelMessage(key, payloadFieldNames, payloadFieldValues,
+ new SamzaSqlRelMsgMetadata("", "", ""));
+ }
+
+ public static void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
List<Object> fieldValues, Schema cachedSchema) {
// Please note that record schema and cached schema could be different due to schema evolution.
// Always represent record schema in the form of cached schema. This approach has the side-effect
// of dropping the newly added fields in the scenarios where the record schema has newer version
// than the cached schema. [TODO: SAMZA-1679]
Schema recordSchema = record.getSchema();
- fieldNames.addAll(cachedSchema.getFields().stream()
- .map(Schema.Field::name)
- .collect(Collectors.toList()));
+ fieldNames.addAll(cachedSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
fieldValues.addAll(fieldNames.stream()
.map(f -> convertToJavaObject(
recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null,
- getNonNullUnionSchema(payloadSchema.getField(f).schema())))
+ getNonNullUnionSchema(cachedSchema.getField(f).schema()))) // get schema from cachedSchema
.collect(Collectors.toList()));
}
- private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
+ private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (avroRecord != null) {
- fieldNames.addAll(avroRecord.getSchema().getFields()
- .stream()
- .map(Schema.Field::name)
- .collect(Collectors.toList()));
- fieldValues.addAll(avroRecord.getSchema().getFields()
+ fieldNames.addAll(
+ avroRecord.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+ fieldValues.addAll(avroRecord.getSchema()
+ .getFields()
.stream()
.map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
@@ -149,7 +159,7 @@ public class AvroRelConverter implements SamzaRelConverter {
return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema));
}
- static public GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
+ private static GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) {
GenericRecord record = new GenericData.Record(schema);
List<String> fieldNames = relRecord.getFieldNames();
List<Object> values = relRecord.getFieldValues();
@@ -170,22 +180,20 @@ public class AvroRelConverter implements SamzaRelConverter {
schema.getName(), schema.getNamespace(), fieldName);
continue;
}
-
Object relObj = values.get(index);
Schema fieldSchema = schema.getField(fieldName).schema();
record.put(fieldName, convertToAvroObject(relObj, getNonNullUnionSchema(fieldSchema)));
}
}
- // If all field values in the record are null, return a null record.
return record;
}
- static public Object convertToAvroObject(Object relObj, Schema schema) {
+ public static Object convertToAvroObject(Object relObj, Schema schema) {
if (relObj == null) {
return null;
}
- switch(schema.getType()) {
+ switch (schema.getType()) {
case RECORD:
return convertToGenericRecord((SamzaSqlRelRecord) relObj, getNonNullUnionSchema(schema));
case ARRAY:
@@ -196,14 +204,15 @@ public class AvroRelConverter implements SamzaRelConverter {
case MAP:
return ((Map<String, ?>) relObj).entrySet()
.stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(),
- getNonNullUnionSchema(schema).getValueType())));
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
case UNION:
for (Schema unionSchema : schema.getTypes()) {
if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
return convertToAvroObject(relObj, unionSchema);
}
}
+ return null;
case ENUM:
return new GenericData.EnumSymbol(schema, (String) relObj);
case FIXED:
@@ -217,11 +226,11 @@ public class AvroRelConverter implements SamzaRelConverter {
// Not doing any validations of data types with Avro schema considering the resource cost per message.
// Casting would fail if the data types are not in sync with the schema.
- public Object convertToJavaObject(Object avroObj, Schema schema) {
+ public static Object convertToJavaObject(Object avroObj, Schema schema) {
if (avroObj == null) {
return null;
}
- switch(schema.getType()) {
+ switch (schema.getType()) {
case RECORD:
return convertToRelRecord((IndexedRecord) avroObj);
case ARRAY: {
@@ -235,26 +244,26 @@ public class AvroRelConverter implements SamzaRelConverter {
throw new SamzaException("Unsupported array type " + avroObj.getClass().getSimpleName());
}
- retVal.addAll(
- avroArray.stream()
- .map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType()))
- .collect(Collectors.toList()));
+ retVal.addAll(avroArray.stream()
+ .map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType()))
+ .collect(Collectors.toList()));
return retVal;
}
case MAP: {
Map<String, Object> retVal = new HashMap<>();
- retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
+ retVal.putAll(((Map<String, ?>) avroObj).entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
e -> convertToJavaObject(e.getValue(), getNonNullUnionSchema(schema).getValueType()))));
return retVal;
}
case UNION:
for (Schema unionSchema : schema.getTypes()) {
- if (isSchemaCompatible(avroObj, unionSchema)) {
+ if (isSchemaCompatibleWithAvroObj(avroObj, unionSchema)) {
return convertToJavaObject(avroObj, unionSchema);
}
}
+ return null;
case ENUM:
return avroObj.toString();
case FIXED:
@@ -271,45 +280,48 @@ public class AvroRelConverter implements SamzaRelConverter {
}
}
- private boolean isSchemaCompatible(Object avroObj, Schema unionSchema) {
- if (unionSchema.getType() == Schema.Type.NULL && avroObj == null) {
- return true;
+ private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionSchema) {
+ Validate.notNull(unionSchema, "Schema cannot be null");
+ if (unionSchema.getType() == Schema.Type.NULL) {
+ return relObj == null;
}
+
switch (unionSchema.getType()) {
case RECORD:
- return avroObj instanceof IndexedRecord;
+ return relObj instanceof SamzaSqlRelRecord;
case ARRAY:
- return avroObj instanceof GenericData.Array || avroObj instanceof List;
+ return relObj instanceof List;
case MAP:
- return avroObj instanceof Map;
+ return relObj instanceof Map;
case FIXED:
- return avroObj instanceof GenericData.Fixed;
+ return relObj instanceof ByteString;
case BYTES:
- return avroObj instanceof ByteBuffer;
+ return relObj instanceof ByteString;
case FLOAT:
- return avroObj instanceof Float;
+ return relObj instanceof Float || relObj instanceof Double;
default:
return true;
}
}
- private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionSchema) {
- if (unionSchema.getType() == Schema.Type.NULL && relObj == null) {
- return true;
+ private static boolean isSchemaCompatibleWithAvroObj(Object avroObj, Schema unionSchema) {
+ Validate.notNull(unionSchema, "Schema cannot be null");
+ if (unionSchema.getType() == Schema.Type.NULL) {
+ return avroObj == null;
}
switch (unionSchema.getType()) {
case RECORD:
- return relObj instanceof SamzaSqlRelRecord;
+ return avroObj instanceof IndexedRecord;
case ARRAY:
- return relObj instanceof List;
+ return avroObj instanceof GenericData.Array || avroObj instanceof List;
case MAP:
- return relObj instanceof Map;
+ return avroObj instanceof Map;
case FIXED:
- return relObj instanceof ByteString;
+ return avroObj instanceof GenericData.Fixed;
case BYTES:
- return relObj instanceof ByteString;
+ return avroObj instanceof ByteBuffer;
case FLOAT:
- return relObj instanceof Float || relObj instanceof Double;
+ return avroObj instanceof Float;
default:
return true;
}