You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/12 22:02:44 UTC
[incubator-pinot] branch master updated: Fix extract method in
AvroRecordExtractor class (#6005)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cadd61c Fix extract method in AvroRecordExtractor class (#6005)
cadd61c is described below
commit cadd61c22ba04b83fca7f205e4c0aaae7bde09bc
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Sat Sep 12 15:02:31 2020 -0700
Fix extract method in AvroRecordExtractor class (#6005)
* Fix extract method in AvroRecordExtractor
Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
.../inputformat/avro/AvroRecordExtractor.java | 15 +++++----
.../inputformat/avro/AvroRecordExtractorTest.java | 37 +++++++++++++++++++++-
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 15 ---------
3 files changed, 44 insertions(+), 23 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 339ab67..646debc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -18,14 +18,14 @@
*/
package org.apache.pinot.plugin.inputformat.avro;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
/**
@@ -46,13 +46,14 @@ public class AvroRecordExtractor implements RecordExtractor<GenericRecord> {
@Override
public GenericRow extract(GenericRecord from, GenericRow to) {
if (_extractAll) {
- Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from);
- jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, AvroUtils.convert(value)));
+ List<Schema.Field> fields = from.getSchema().getFields();
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+ }
} else {
for (String fieldName : _fields) {
- Object value = from.get(fieldName);
- Object convertedValue = AvroUtils.convert(value);
- to.putValue(fieldName, convertedValue);
+ to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
}
}
return to;
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index b985349..833e8fc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -18,9 +18,13 @@
*/
package org.apache.pinot.plugin.inputformat.avro;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -29,8 +33,13 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
import static org.apache.avro.Schema.*;
@@ -39,7 +48,7 @@ import static org.apache.avro.Schema.*;
* Tests the {@link AvroRecordExtractor} using a schema containing groovy transform functions
*/
public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
-
+ private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
private final File _dataFile = new File(_tempDir, "events.avro");
/**
@@ -87,4 +96,30 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
protected boolean testExtractAll() {
return true;
}
+
+ @Test
+ public void testDataTypeReturnFromAvroRecordExtractor()
+ throws IOException {
+ String testColumnName = "column1";
+ long columnValue = 999999999L;
+ AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+ avroRecordExtractor.init(null, null);
+
+ org.apache.pinot.spi.data.Schema pinotSchema = new org.apache.pinot.spi.data.Schema.SchemaBuilder()
+ .addSingleValueDimension(testColumnName, FieldSpec.DataType.LONG).build();
+ Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(pinotSchema);
+ GenericRecord genericRecord = new GenericData.Record(schema);
+ genericRecord.put(testColumnName, columnValue);
+ GenericRow genericRow = new GenericRow();
+
+ avroRecordExtractor.extract(genericRecord, genericRow);
+ Assert.assertEquals(columnValue, genericRow.getValue(testColumnName));
+ Assert.assertEquals("Long", genericRow.getValue(testColumnName).getClass().getSimpleName());
+
+ String jsonString = genericRecord.toString();
+ Map<String, Object> jsonMap = DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {
+ });
+ // The data type got changed to Integer, which will then have to trigger the convert method in DataTypeTransformer class.
+ Assert.assertEquals("Integer", jsonMap.get(testColumnName).getClass().getSimpleName());
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index f5bf9d3..a8b206d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -34,9 +34,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
-import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -193,17 +191,4 @@ public class JsonUtils {
throw new IllegalArgumentException(String.format("Unsupported data type %s", dataType));
}
}
-
- /**
- * Converts from a GenericRecord to a json map
- */
- public static Map<String, Object> genericRecordToJson(GenericRecord genericRecord) {
- try {
- String jsonString = genericRecord.toString();
- return DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {
- });
- } catch (IOException e) {
- throw new IllegalStateException("Caught exception when converting generic record " + genericRecord + " to JSON");
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org