You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/11/28 23:26:52 UTC

[pinot] branch master updated: Support Avro's Fixed data type (#9642)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f923de45 Support Avro's Fixed data type (#9642)
86f923de45 is described below

commit 86f923de4533c5b1573854fe5bfca73e15d98604
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Nov 28 15:26:44 2022 -0800

    Support Avro's Fixed data type (#9642)
---
 .../queries/JsonIngestionFromAvroQueriesTest.java  |  54 +++--
 .../inputformat/avro/AvroRecordExtractor.java      |   4 +
 .../inputformat/avro/AvroRecordExtractorTest.java  | 226 +++++++++++++++++++++
 .../recordtransformer/DataTypeTransformer.java     |   1 +
 4 files changed, 268 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
index 16d17c2cb2..b53f3a35d4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.queries;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -30,12 +31,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.scalar.StringFunctions;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -75,12 +79,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
   private static final String JSON_COLUMN_1 = "jsonColumn1"; // for testing RECORD, ARRAY, MAP, UNION
   private static final String JSON_COLUMN_2 = "jsonColumn2"; // for testing ENUM
   private static final String JSON_COLUMN_3 = "jsonColumn3"; // for testing FIXED
+  private static final String JSON_COLUMN_4 = "jsonColumn4"; // for testing BYTES
   private static final String STRING_COLUMN = "stringColumn";
   private static final org.apache.pinot.spi.data.Schema SCHEMA =
       new org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
           .addSingleValueDimension(JSON_COLUMN_1, FieldSpec.DataType.JSON)
           .addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON)
           .addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON)
+          .addSingleValueDimension(JSON_COLUMN_4, FieldSpec.DataType.JSON)
           .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING).build();
   private static final TableConfig TABLE_CONFIG =
       new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -105,13 +111,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
 
   /** @return {@link GenericRow} representing a row in Pinot table. */
   private static GenericRow createTableRecord(int intValue, String stringValue, Object jsonValue,
-      GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue) {
+      GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue, byte[] bytesValue) {
     GenericRow record = new GenericRow();
     record.putValue(INT_COLUMN, intValue);
     record.putValue(STRING_COLUMN, stringValue);
     record.putValue(JSON_COLUMN_1, jsonValue);
     record.putValue(JSON_COLUMN_2, enumValue);
     record.putValue(JSON_COLUMN_3, fixedValue);
+    record.putValue(JSON_COLUMN_4, ByteBuffer.wrap(bytesValue));
     return record;
   }
 
@@ -158,37 +165,39 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
         new Field(JSON_COLUMN_1,
             createUnion(createArray(create(Type.STRING)), createMap(create(Type.STRING)), createRecordSchema(),
                 create(Type.STRING), create(Type.NULL))), new Field(JSON_COLUMN_2, enumSchema),
-        new Field(JSON_COLUMN_3, fixedSchema));
+        new Field(JSON_COLUMN_3, fixedSchema),
+        new Field(JSON_COLUMN_4, create(Type.BYTES)));
     avroSchema.setFields(fields);
     List<GenericRow> inputRecords = new ArrayList<>();
     // Insert ARRAY
     inputRecords.add(
         createTableRecord(1, "daffy duck", Arrays.asList("this", "is", "a", "test"), createEnumField(enumSchema, "UP"),
-            createFixedField(fixedSchema, 1)));
+            createFixedField(fixedSchema, 1), new byte[] {0, 0, 0, 1}));
 
     // Insert MAP
     inputRecords.add(
         createTableRecord(2, "mickey mouse", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2)));
+            createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2), new byte[] {0, 0, 0, 2}));
     inputRecords.add(
         createTableRecord(3, "donald duck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3)));
+            createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3), new byte[] {0, 0, 0, 3}));
     inputRecords.add(
         createTableRecord(4, "scrooge mcduck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4)));
+            createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4), new byte[] {0, 0, 0, 4}));
 
     // insert RECORD
     inputRecords.add(createTableRecord(5, "minney mouse", createRecordField("id", 1, "name", "minney"),
-        createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5)));
+        createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5), new byte[] {0, 0, 0, 5}));
 
     // Insert simple Java String (gets converted into JSON value)
     inputRecords.add(
-        createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6)));
+        createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6),
+            new byte[] {0, 0, 0, 6}));
 
     // Insert JSON string (gets converted into JSON document)
     inputRecords.add(
         createTableRecord(7, "scooby doo", "{\"name\":\"scooby\",\"id\":7}", createEnumField(enumSchema, "UP"),
-            createFixedField(fixedSchema, 7)));
+            createFixedField(fixedSchema, 7), new byte[] {0, 0, 0, 7}));
 
     try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
       fileWriter.create(avroSchema, AVRO_DATA_FILE);
@@ -199,6 +208,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
         record.put(JSON_COLUMN_1, inputRecord.getValue(JSON_COLUMN_1));
         record.put(JSON_COLUMN_2, inputRecord.getValue(JSON_COLUMN_2));
         record.put(JSON_COLUMN_3, inputRecord.getValue(JSON_COLUMN_3));
+        record.put(JSON_COLUMN_4, inputRecord.getValue(JSON_COLUMN_4));
         fileWriter.append(record);
       }
     }
@@ -212,6 +222,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
     set.add(JSON_COLUMN_1);
     set.add(JSON_COLUMN_2);
     set.add(JSON_COLUMN_3);
+    set.add(JSON_COLUMN_4);
     AvroRecordReader avroRecordReader = new AvroRecordReader();
     avroRecordReader.init(AVRO_DATA_FILE, set, null);
     return avroRecordReader;
@@ -316,19 +327,28 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
   /** Verify that ingestion from avro FIXED type field (jsonColumn3) to Pinot JSON column worked fine. */
   @Test
   public void testSimpleSelectOnFixedJsonColumn() {
-    Operator<SelectionResultsBlock> operator = getOperator("select jsonColumn3 FROM testTable");
+    testByteArray("select jsonColumn3 FROM testTable");
+  }
+
+  /** Verify that ingestion from avro BYTES type field (jsonColumn4) to Pinot JSON column worked fine. */
+  @Test
+  public void testSimpleSelectOnBytesJsonColumn() {
+    testByteArray("select jsonColumn4 FROM testTable");
+  }
+
+  private void testByteArray(String query) {
+    Operator<SelectionResultsBlock> operator = getOperator(query);
     SelectionResultsBlock block = operator.nextBlock();
     Collection<Object[]> rows = block.getRows();
     Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.JSON);
 
-    List<String> expecteds =
-        Arrays.asList("[[0,0,0,1]]", "[[0,0,0,2]]", "[[0,0,0,3]]", "[[0,0,0,4]]", "[[0,0,0,5]]", "[[0,0,0,6]]",
-            "[[0,0,0,7]]");
-    int index = 0;
+    List<String> expecteds = IntStream.range(1, 8)
+        .mapToObj(i -> new byte[] {0, 0, 0, (byte) i})
+        .map(byteArray -> "[\"" + StringFunctions.toBase64(byteArray) + "\"]")
+        .collect(Collectors.toList());
 
-    Iterator<Object[]> iterator = rows.iterator();
-    while (iterator.hasNext()) {
-      Object[] row = iterator.next();
+    int index = 0;
+    for (Object[] row : rows) {
       Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
     }
   }
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 1cb4d04721..d8a757def8 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -136,6 +137,9 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
     if (value instanceof Instant) {
       return Timestamp.from((Instant) value);
     }
+    if (value instanceof GenericFixed) {
+      return ((GenericFixed) value).bytes();
+    }
     // LocalDate, LocalTime and UUID are returned as the ::toString version of the logical type
     return super.convertSingleValue(value);
   }
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index e748bc68e4..bda7fd9909 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -43,6 +43,10 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -243,4 +247,226 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
       Assert.assertEquals(avroRecordExtractor.convertSingleValue(byteBuffer), content);
     }
   }
+
+  @Test
+  public void testGenericFixedDataType() {
+    Schema avroSchema = createRecord("EventRecord", null, null, false);
+    Schema fixedSchema = createFixed("FixedSchema", "", "", 4);
+    avroSchema.setFields(Lists.newArrayList(new Schema.Field("fixedData", fixedSchema)));
+    GenericRecord genericRecord = new GenericData.Record(avroSchema);
+    genericRecord.put("fixedData", new GenericData.Fixed(fixedSchema, new byte[]{0, 1, 2, 3}));
+    GenericRow genericRow = new GenericRow();
+    AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+    avroRecordExtractor.init(null, null);
+    avroRecordExtractor.extract(genericRecord, genericRow);
+    Assert.assertEquals(genericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3});
+  }
+
+  @Test
+  public void testSpecificFixedDataType() {
+    EventRecord specificRecord = new EventRecord(new FixedSchema(new byte[]{0, 1, 2, 3}));
+    GenericRow outputGenericRow = new GenericRow();
+    AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+    avroRecordExtractor.init(null, null);
+    avroRecordExtractor.extract(specificRecord, outputGenericRow);
+    Assert.assertEquals(outputGenericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3});
+  }
+
+  /**
+   * SpecificRecord created for testing Fixed data type
+   */
+  static class EventRecord extends org.apache.avro.specific.SpecificRecordBase
+      implements org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 5451592186784305712L;
+    public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse(
+        "{\"type\":\"record\",\"name\":\"EventRecord\",\"fields\":[{"
+            + "\"name\":\"fixedData\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}}"
+            + "]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA;
+    }
+
+    private static final SpecificData MODEL = new SpecificData();
+
+    private static final BinaryMessageEncoder<EventRecord> ENCODER =
+        new BinaryMessageEncoder<EventRecord>(MODEL, SCHEMA);
+
+    private static final BinaryMessageDecoder<EventRecord> DECODER =
+        new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA);
+
+    public static BinaryMessageEncoder<EventRecord> getEncoder() {
+      return ENCODER;
+    }
+
+    public static BinaryMessageDecoder<EventRecord> getDecoder() {
+      return DECODER;
+    }
+
+    public static BinaryMessageDecoder<EventRecord> createDecoder(SchemaStore resolver) {
+      return new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA, resolver);
+    }
+
+    public java.nio.ByteBuffer toByteBuffer()
+        throws java.io.IOException {
+      return ENCODER.encode(this);
+    }
+
+    public static EventRecord fromByteBuffer(java.nio.ByteBuffer b)
+        throws java.io.IOException {
+      return DECODER.decode(b);
+    }
+
+    private FixedSchema _fixedData;
+
+    public EventRecord() {
+    }
+
+    public EventRecord(FixedSchema fixedData) {
+      _fixedData = fixedData;
+    }
+
+    public org.apache.avro.specific.SpecificData getSpecificData() {
+      return MODEL;
+    }
+
+    public org.apache.avro.Schema getSchema() {
+      return SCHEMA;
+    }
+
+    // Used by DatumWriter.  Applications should not call.
+    public java.lang.Object get(int field) {
+      switch (field) {
+        case 0:
+          return _fixedData;
+        default:
+          throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    // Used by DatumReader.  Applications should not call.
+    @SuppressWarnings(value = "unchecked")
+    public void put(int field, java.lang.Object value) {
+      switch (field) {
+        case 0:
+          _fixedData = (FixedSchema) value;
+          break;
+        default:
+          throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    public FixedSchema getFixedData() {
+      return _fixedData;
+    }
+
+    public void setFixedData(FixedSchema value) {
+      _fixedData = value;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final org.apache.avro.io.DatumWriter<EventRecord> WRITER =
+        (org.apache.avro.io.DatumWriter<EventRecord>) MODEL.createDatumWriter(SCHEMA);
+
+    @Override
+    public void writeExternal(java.io.ObjectOutput out)
+        throws java.io.IOException {
+      WRITER.write(this, SpecificData.getEncoder(out));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final org.apache.avro.io.DatumReader<EventRecord> READER =
+        (org.apache.avro.io.DatumReader<EventRecord>) MODEL.createDatumReader(SCHEMA);
+
+    @Override
+    public void readExternal(java.io.ObjectInput in)
+        throws java.io.IOException {
+      READER.read(this, SpecificData.getDecoder(in));
+    }
+
+    @Override
+    protected boolean hasCustomCoders() {
+      return true;
+    }
+
+    @Override
+    public void customEncode(org.apache.avro.io.Encoder out)
+        throws java.io.IOException {
+      out.writeFixed(_fixedData.bytes(), 0, 4);
+    }
+
+    @Override
+    public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+        throws java.io.IOException {
+      org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+      if (fieldOrder == null) {
+        if (_fixedData == null) {
+          _fixedData = new FixedSchema();
+        }
+        in.readFixed(_fixedData.bytes(), 0, 4);
+      } else {
+        for (int i = 0; i < 1; i++) {
+          switch (fieldOrder[i].pos()) {
+            case 0:
+              if (_fixedData == null) {
+                _fixedData = new FixedSchema();
+              }
+              in.readFixed(_fixedData.bytes(), 0, 4);
+              break;
+
+            default:
+              throw new java.io.IOException("Corrupt ResolvingDecoder.");
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * SpecificFixed created for testing Fixed data type
+   */
+  static class FixedSchema extends org.apache.avro.specific.SpecificFixed {
+    private static final long serialVersionUID = -1121289150751596161L;
+    public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse(
+        "{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA;
+    }
+
+    public org.apache.avro.Schema getSchema() {
+      return SCHEMA;
+    }
+
+    /** Creates a new FixedSchema */
+    public FixedSchema() {
+      super();
+    }
+
+    /**
+     * Creates a new FixedSchema with the given bytes.
+     * @param bytes The bytes to create the new FixedSchema.
+     */
+    public FixedSchema(byte[] bytes) {
+      super(bytes);
+    }
+
+    private static final org.apache.avro.io.DatumWriter<FixedSchema> WRITER =
+        new org.apache.avro.specific.SpecificDatumWriter<FixedSchema>(SCHEMA);
+
+    @Override
+    public void writeExternal(java.io.ObjectOutput out)
+        throws java.io.IOException {
+      WRITER.write(this, org.apache.avro.specific.SpecificData.getEncoder(out));
+    }
+
+    private static final org.apache.avro.io.DatumReader<FixedSchema> READER =
+        new org.apache.avro.specific.SpecificDatumReader<FixedSchema>(SCHEMA);
+
+    @Override
+    public void readExternal(java.io.ObjectInput in)
+        throws java.io.IOException {
+      READER.read(this, org.apache.avro.specific.SpecificData.getDecoder(in));
+    }
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 3838b6a56f..1ed69c9065 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -92,6 +92,7 @@ public class DataTypeTransformer implements RecordTransformer {
           // Single-value column
           source = PinotDataType.getSingleValueType(value.getClass());
         }
+
         // Skipping conversion when srcType!=destType is speculative, and can be unsafe when
         // the array for MV column contains values of mixing types. Mixing types can lead
         // to ClassCastException during conversion, often aborting data ingestion jobs.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org