You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/11/28 23:26:52 UTC
[pinot] branch master updated: Support Avro's Fixed data type (#9642)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 86f923de45 Support Avro's Fixed data type (#9642)
86f923de45 is described below
commit 86f923de4533c5b1573854fe5bfca73e15d98604
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Nov 28 15:26:44 2022 -0800
Support Avro's Fixed data type (#9642)
---
.../queries/JsonIngestionFromAvroQueriesTest.java | 54 +++--
.../inputformat/avro/AvroRecordExtractor.java | 4 +
.../inputformat/avro/AvroRecordExtractorTest.java | 226 +++++++++++++++++++++
.../recordtransformer/DataTypeTransformer.java | 1 +
4 files changed, 268 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
index 16d17c2cb2..b53f3a35d4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.queries;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -30,12 +31,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.scalar.StringFunctions;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -75,12 +79,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
private static final String JSON_COLUMN_1 = "jsonColumn1"; // for testing RECORD, ARRAY, MAP, UNION
private static final String JSON_COLUMN_2 = "jsonColumn2"; // for testing ENUM
private static final String JSON_COLUMN_3 = "jsonColumn3"; // for testing FIXED
+ private static final String JSON_COLUMN_4 = "jsonColumn4"; // for testing BYTES
private static final String STRING_COLUMN = "stringColumn";
private static final org.apache.pinot.spi.data.Schema SCHEMA =
new org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
.addSingleValueDimension(JSON_COLUMN_1, FieldSpec.DataType.JSON)
.addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON)
.addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON)
+ .addSingleValueDimension(JSON_COLUMN_4, FieldSpec.DataType.JSON)
.addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING).build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -105,13 +111,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
/** @return {@link GenericRow} representing a row in Pinot table. */
private static GenericRow createTableRecord(int intValue, String stringValue, Object jsonValue,
- GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue) {
+ GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue, byte[] bytesValue) {
GenericRow record = new GenericRow();
record.putValue(INT_COLUMN, intValue);
record.putValue(STRING_COLUMN, stringValue);
record.putValue(JSON_COLUMN_1, jsonValue);
record.putValue(JSON_COLUMN_2, enumValue);
record.putValue(JSON_COLUMN_3, fixedValue);
+ record.putValue(JSON_COLUMN_4, ByteBuffer.wrap(bytesValue));
return record;
}
@@ -158,37 +165,39 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
new Field(JSON_COLUMN_1,
createUnion(createArray(create(Type.STRING)), createMap(create(Type.STRING)), createRecordSchema(),
create(Type.STRING), create(Type.NULL))), new Field(JSON_COLUMN_2, enumSchema),
- new Field(JSON_COLUMN_3, fixedSchema));
+ new Field(JSON_COLUMN_3, fixedSchema),
+ new Field(JSON_COLUMN_4, create(Type.BYTES)));
avroSchema.setFields(fields);
List<GenericRow> inputRecords = new ArrayList<>();
// Insert ARRAY
inputRecords.add(
createTableRecord(1, "daffy duck", Arrays.asList("this", "is", "a", "test"), createEnumField(enumSchema, "UP"),
- createFixedField(fixedSchema, 1)));
+ createFixedField(fixedSchema, 1), new byte[] {0, 0, 0, 1}));
// Insert MAP
inputRecords.add(
createTableRecord(2, "mickey mouse", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
- createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2)));
+ createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2), new byte[] {0, 0, 0, 2}));
inputRecords.add(
createTableRecord(3, "donald duck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
- createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3)));
+ createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3), new byte[] {0, 0, 0, 3}));
inputRecords.add(
createTableRecord(4, "scrooge mcduck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
- createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4)));
+ createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4), new byte[] {0, 0, 0, 4}));
// insert RECORD
inputRecords.add(createTableRecord(5, "minney mouse", createRecordField("id", 1, "name", "minney"),
- createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5)));
+ createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5), new byte[] {0, 0, 0, 5}));
// Insert simple Java String (gets converted into JSON value)
inputRecords.add(
- createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6)));
+ createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6),
+ new byte[] {0, 0, 0, 6}));
// Insert JSON string (gets converted into JSON document)
inputRecords.add(
createTableRecord(7, "scooby doo", "{\"name\":\"scooby\",\"id\":7}", createEnumField(enumSchema, "UP"),
- createFixedField(fixedSchema, 7)));
+ createFixedField(fixedSchema, 7), new byte[] {0, 0, 0, 7}));
try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
fileWriter.create(avroSchema, AVRO_DATA_FILE);
@@ -199,6 +208,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
record.put(JSON_COLUMN_1, inputRecord.getValue(JSON_COLUMN_1));
record.put(JSON_COLUMN_2, inputRecord.getValue(JSON_COLUMN_2));
record.put(JSON_COLUMN_3, inputRecord.getValue(JSON_COLUMN_3));
+ record.put(JSON_COLUMN_4, inputRecord.getValue(JSON_COLUMN_4));
fileWriter.append(record);
}
}
@@ -212,6 +222,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
set.add(JSON_COLUMN_1);
set.add(JSON_COLUMN_2);
set.add(JSON_COLUMN_3);
+ set.add(JSON_COLUMN_4);
AvroRecordReader avroRecordReader = new AvroRecordReader();
avroRecordReader.init(AVRO_DATA_FILE, set, null);
return avroRecordReader;
@@ -316,19 +327,28 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
/** Verify that ingestion from avro FIXED type field (jsonColumn3) to Pinot JSON column worked fine. */
@Test
public void testSimpleSelectOnFixedJsonColumn() {
- Operator<SelectionResultsBlock> operator = getOperator("select jsonColumn3 FROM testTable");
+ testByteArray("select jsonColumn3 FROM testTable");
+ }
+
+ /** Verify that ingestion from avro BYTES type field (jsonColumn4) to Pinot JSON column worked fine. */
+ @Test
+ public void testSimpleSelectOnBytesJsonColumn() {
+ testByteArray("select jsonColumn4 FROM testTable");
+ }
+
+ private void testByteArray(String query) {
+ Operator<SelectionResultsBlock> operator = getOperator(query);
SelectionResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getRows();
Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.JSON);
- List<String> expecteds =
- Arrays.asList("[[0,0,0,1]]", "[[0,0,0,2]]", "[[0,0,0,3]]", "[[0,0,0,4]]", "[[0,0,0,5]]", "[[0,0,0,6]]",
- "[[0,0,0,7]]");
- int index = 0;
+ List<String> expecteds = IntStream.range(1, 8)
+ .mapToObj(i -> new byte[] {0, 0, 0, (byte) i})
+ .map(byteArray -> "[\"" + StringFunctions.toBase64(byteArray) + "\"]")
+ .collect(Collectors.toList());
- Iterator<Object[]> iterator = rows.iterator();
- while (iterator.hasNext()) {
- Object[] row = iterator.next();
+ int index = 0;
+ for (Object[] row : rows) {
Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
}
}
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 1cb4d04721..d8a757def8 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -136,6 +137,9 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
if (value instanceof Instant) {
return Timestamp.from((Instant) value);
}
+ if (value instanceof GenericFixed) {
+ return ((GenericFixed) value).bytes();
+ }
// LocalDate, LocalTime and UUID are returned as the ::toString version of the logical type
return super.convertSingleValue(value);
}
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index e748bc68e4..bda7fd9909 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -43,6 +43,10 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -243,4 +247,226 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest {
Assert.assertEquals(avroRecordExtractor.convertSingleValue(byteBuffer), content);
}
}
+
+ @Test
+ public void testGenericFixedDataType() {
+ Schema avroSchema = createRecord("EventRecord", null, null, false);
+ Schema fixedSchema = createFixed("FixedSchema", "", "", 4);
+ avroSchema.setFields(Lists.newArrayList(new Schema.Field("fixedData", fixedSchema)));
+ GenericRecord genericRecord = new GenericData.Record(avroSchema);
+ genericRecord.put("fixedData", new GenericData.Fixed(fixedSchema, new byte[]{0, 1, 2, 3}));
+ GenericRow genericRow = new GenericRow();
+ AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+ avroRecordExtractor.init(null, null);
+ avroRecordExtractor.extract(genericRecord, genericRow);
+ Assert.assertEquals(genericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3});
+ }
+
+ @Test
+ public void testSpecificFixedDataType() {
+ EventRecord specificRecord = new EventRecord(new FixedSchema(new byte[]{0, 1, 2, 3}));
+ GenericRow outputGenericRow = new GenericRow();
+ AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
+ avroRecordExtractor.init(null, null);
+ avroRecordExtractor.extract(specificRecord, outputGenericRow);
+ Assert.assertEquals(outputGenericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3});
+ }
+
+ /**
+ * SpecificRecord created for testing Fixed data type
+ */
+ static class EventRecord extends org.apache.avro.specific.SpecificRecordBase
+ implements org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5451592186784305712L;
+ public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse(
+ "{\"type\":\"record\",\"name\":\"EventRecord\",\"fields\":[{"
+ + "\"name\":\"fixedData\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}}"
+ + "]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA;
+ }
+
+ private static final SpecificData MODEL = new SpecificData();
+
+ private static final BinaryMessageEncoder<EventRecord> ENCODER =
+ new BinaryMessageEncoder<EventRecord>(MODEL, SCHEMA);
+
+ private static final BinaryMessageDecoder<EventRecord> DECODER =
+ new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA);
+
+ public static BinaryMessageEncoder<EventRecord> getEncoder() {
+ return ENCODER;
+ }
+
+ public static BinaryMessageDecoder<EventRecord> getDecoder() {
+ return DECODER;
+ }
+
+ public static BinaryMessageDecoder<EventRecord> createDecoder(SchemaStore resolver) {
+ return new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA, resolver);
+ }
+
+ public java.nio.ByteBuffer toByteBuffer()
+ throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ public static EventRecord fromByteBuffer(java.nio.ByteBuffer b)
+ throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
+ private FixedSchema _fixedData;
+
+ public EventRecord() {
+ }
+
+ public EventRecord(FixedSchema fixedData) {
+ _fixedData = fixedData;
+ }
+
+ public org.apache.avro.specific.SpecificData getSpecificData() {
+ return MODEL;
+ }
+
+ public org.apache.avro.Schema getSchema() {
+ return SCHEMA;
+ }
+
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field) {
+ switch (field) {
+ case 0:
+ return _fixedData;
+ default:
+ throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value = "unchecked")
+ public void put(int field, java.lang.Object value) {
+ switch (field) {
+ case 0:
+ _fixedData = (FixedSchema) value;
+ break;
+ default:
+ throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ public FixedSchema getFixedData() {
+ return _fixedData;
+ }
+
+ public void setFixedData(FixedSchema value) {
+ _fixedData = value;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumWriter<EventRecord> WRITER =
+ (org.apache.avro.io.DatumWriter<EventRecord>) MODEL.createDatumWriter(SCHEMA);
+
+ @Override
+ public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ WRITER.write(this, SpecificData.getEncoder(out));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumReader<EventRecord> READER =
+ (org.apache.avro.io.DatumReader<EventRecord>) MODEL.createDatumReader(SCHEMA);
+
+ @Override
+ public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ READER.read(this, SpecificData.getDecoder(in));
+ }
+
+ @Override
+ protected boolean hasCustomCoders() {
+ return true;
+ }
+
+ @Override
+ public void customEncode(org.apache.avro.io.Encoder out)
+ throws java.io.IOException {
+ out.writeFixed(_fixedData.bytes(), 0, 4);
+ }
+
+ @Override
+ public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+ throws java.io.IOException {
+ org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+ if (fieldOrder == null) {
+ if (_fixedData == null) {
+ _fixedData = new FixedSchema();
+ }
+ in.readFixed(_fixedData.bytes(), 0, 4);
+ } else {
+ for (int i = 0; i < 1; i++) {
+ switch (fieldOrder[i].pos()) {
+ case 0:
+ if (_fixedData == null) {
+ _fixedData = new FixedSchema();
+ }
+ in.readFixed(_fixedData.bytes(), 0, 4);
+ break;
+
+ default:
+ throw new java.io.IOException("Corrupt ResolvingDecoder.");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * SpecificFixed created for testing Fixed data type
+ */
+ static class FixedSchema extends org.apache.avro.specific.SpecificFixed {
+ private static final long serialVersionUID = -1121289150751596161L;
+ public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse(
+ "{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA;
+ }
+
+ public org.apache.avro.Schema getSchema() {
+ return SCHEMA;
+ }
+
+ /** Creates a new FixedSchema */
+ public FixedSchema() {
+ super();
+ }
+
+ /**
+ * Creates a new FixedSchema with the given bytes.
+ * @param bytes The bytes to create the new FixedSchema.
+ */
+ public FixedSchema(byte[] bytes) {
+ super(bytes);
+ }
+
+ private static final org.apache.avro.io.DatumWriter<FixedSchema> WRITER =
+ new org.apache.avro.specific.SpecificDatumWriter<FixedSchema>(SCHEMA);
+
+ @Override
+ public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ WRITER.write(this, org.apache.avro.specific.SpecificData.getEncoder(out));
+ }
+
+ private static final org.apache.avro.io.DatumReader<FixedSchema> READER =
+ new org.apache.avro.specific.SpecificDatumReader<FixedSchema>(SCHEMA);
+
+ @Override
+ public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ READER.read(this, org.apache.avro.specific.SpecificData.getDecoder(in));
+ }
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 3838b6a56f..1ed69c9065 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -92,6 +92,7 @@ public class DataTypeTransformer implements RecordTransformer {
// Single-value column
source = PinotDataType.getSingleValueType(value.getClass());
}
+
// Skipping conversion when srcType!=destType is speculative, and can be unsafe when
// the array for MV column contains values of mixing types. Mixing types can lead
// to ClassCastException during conversion, often aborting data ingestion jobs.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org