You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/12 22:02:44 UTC

[incubator-pinot] branch master updated: Fix extract method in AvroRecordExtractor class (#6005)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cadd61c  Fix extract method in AvroRecordExtractor class (#6005)
cadd61c is described below

commit cadd61c22ba04b83fca7f205e4c0aaae7bde09bc
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Sat Sep 12 15:02:31 2020 -0700

    Fix extract method in AvroRecordExtractor class (#6005)
    
    * Fix extract method in AvroRecordExtractor
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../inputformat/avro/AvroRecordExtractor.java      | 15 +++++----
 .../inputformat/avro/AvroRecordExtractorTest.java  | 37 +++++++++++++++++++++-
 .../java/org/apache/pinot/spi/utils/JsonUtils.java | 15 ---------
 3 files changed, 44 insertions(+), 23 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 339ab67..646debc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pinot.plugin.inputformat.avro;
 
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
 
 
 /**
@@ -46,13 +46,14 @@ public class AvroRecordExtractor implements RecordExtractor<GenericRecord> {
   @Override
   public GenericRow extract(GenericRecord from, GenericRow to) {
     if (_extractAll) {
-      Map<String, Object> jsonMap = JsonUtils.genericRecordToJson(from);
-      jsonMap.forEach((fieldName, value) -> to.putValue(fieldName, AvroUtils.convert(value)));
+      List<Schema.Field> fields = from.getSchema().getFields();
+      for (Schema.Field field : fields) {
+        String fieldName = field.name();
+        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
+      }
     } else {
       for (String fieldName : _fields) {
-        Object value = from.get(fieldName);
-        Object convertedValue = AvroUtils.convert(value);
-        to.putValue(fieldName, convertedValue);
+        to.putValue(fieldName, AvroUtils.convert(from.get(fieldName)));
       }
     }
     return to;
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index b985349..833e8fc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pinot.plugin.inputformat.avro;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -29,8 +33,13 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 import static org.apache.avro.Schema.*;
 
@@ -39,7 +48,7 @@ import static org.apache.avro.Schema.*;
  * Tests the {@link AvroRecordExtractor} using a schema containing groovy transform functions
  */
 public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
-
+  private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
   private final File _dataFile = new File(_tempDir, "events.avro");
 
   /**
@@ -87,4 +96,30 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
   protected boolean testExtractAll() {
     return true;
   }
+
+  @Test
+  public void testDataTypeReturnFromAvroRecordExtractor()
+      throws IOException {
+    String testColumnName = "column1";
+    long columnValue = 999999999L;
+    AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+    avroRecordExtractor.init(null, null);
+
+    org.apache.pinot.spi.data.Schema pinotSchema = new org.apache.pinot.spi.data.Schema.SchemaBuilder()
+        .addSingleValueDimension(testColumnName, FieldSpec.DataType.LONG).build();
+    Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(pinotSchema);
+    GenericRecord genericRecord = new GenericData.Record(schema);
+    genericRecord.put(testColumnName, columnValue);
+    GenericRow genericRow = new GenericRow();
+
+    avroRecordExtractor.extract(genericRecord, genericRow);
+    Assert.assertEquals(columnValue, genericRow.getValue(testColumnName));
+    Assert.assertEquals("Long", genericRow.getValue(testColumnName).getClass().getSimpleName());
+
+    String jsonString = genericRecord.toString();
+    Map<String, Object> jsonMap = DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {
+    });
+    // The data type got changed to Integer, which will then have to trigger the convert method in DataTypeTransformer class.
+    Assert.assertEquals("Integer", jsonMap.get(testColumnName).getClass().getSimpleName());
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index f5bf9d3..a8b206d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -34,9 +34,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -193,17 +191,4 @@ public class JsonUtils {
         throw new IllegalArgumentException(String.format("Unsupported data type %s", dataType));
     }
   }
-
-  /**
-   * Converts from a GenericRecord to a json map
-   */
-  public static Map<String, Object> genericRecordToJson(GenericRecord genericRecord) {
-    try {
-      String jsonString = genericRecord.toString();
-      return DEFAULT_MAPPER.readValue(jsonString, new TypeReference<Map<String, Object>>() {
-      });
-    } catch (IOException e) {
-      throw new IllegalStateException("Caught exception when converting generic record " + genericRecord + " to JSON");
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org