You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2020/08/28 17:52:19 UTC
[incubator-pinot] branch release-0.5.0-rc updated: Fix CSV and JSON
converter on BYTES column (#5931)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch release-0.5.0-rc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/release-0.5.0-rc by this push:
new b0d8da0 Fix CSV and JSON converter on BYTES column (#5931)
b0d8da0 is described below
commit b0d8da0e12c0e2fdbaff5da883e92f46ac7b47ea
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Aug 26 15:37:49 2020 -0700
Fix CSV and JSON converter on BYTES column (#5931)
- Fix the issue of PinotSegmentToCsvConverter and PinotSegmentToJsonConverter not handling BYTES column properly
- Add test to test converter for all data types
---
.../converter/PinotSegmentToAvroConverter.java | 13 +-
.../converter/PinotSegmentToCsvConverter.java | 47 ++---
.../converter/PinotSegmentToJsonConverter.java | 16 +-
.../converter/PinotSegmentConverterTest.java | 195 +++++++++++++++++++++
4 files changed, 239 insertions(+), 32 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
index 846302d..d723af8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
@@ -21,6 +21,7 @@ package org.apache.pinot.tools.segment.converter;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
@@ -55,16 +56,14 @@ public class PinotSegmentToAvroConverter implements PinotSegmentConverter {
while (pinotSegmentRecordReader.hasNext()) {
row = pinotSegmentRecordReader.next(row);
Record record = new Record(avroSchema);
-
- for (String field : row.getFieldNames()) {
- Object value = row.getValue(field);
+ for (Map.Entry<String, Object> entry : row.getFieldToValueMap().entrySet()) {
+ String field = entry.getKey();
+ Object value = entry.getValue();
if (value instanceof Object[]) {
record.put(field, Arrays.asList((Object[]) value));
+ } else if (value instanceof byte[]) {
+ record.put(field, ByteBuffer.wrap((byte[]) value));
} else {
- // PinotSegmentRecordReader returns byte[] instead of ByteBuffer.
- if (value instanceof byte[]) {
- value = ByteBuffer.wrap((byte[]) value);
- }
record.put(field, value);
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
index 7354194..899f7fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
@@ -21,11 +21,11 @@ package org.apache.pinot.tools.segment.converter;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.IOException;
import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
/**
@@ -53,31 +53,36 @@ public class PinotSegmentToCsvConverter implements PinotSegmentConverter {
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(new File(_segmentDir));
BufferedWriter recordWriter = new BufferedWriter(new FileWriter(_outputFile))) {
GenericRow row = new GenericRow();
-
+ row = recordReader.next(row);
+ String[] fields = row.getFieldToValueMap().keySet().toArray(new String[0]);
if (_withHeader) {
- row = recordReader.next(row);
- recordWriter.write(StringUtils.join(row.getFieldNames(), _delimiter));
+ recordWriter.write(StringUtils.join(fields, _delimiter));
recordWriter.newLine();
- recordReader.rewind();
}
-
+ writeRow(recordWriter, row, fields);
while (recordReader.hasNext()) {
row = recordReader.next(row);
- String[] fields = row.getFieldNames();
- List<String> record = new ArrayList<>(fields.length);
-
- for (String field : fields) {
- Object value = row.getValue(field);
- if (value instanceof Object[]) {
- record.add(StringUtils.join((Object[]) value, _listDelimiter));
- } else {
- record.add(value.toString());
- }
- }
+ writeRow(recordWriter, row, fields);
+ }
+ }
+ }
- recordWriter.write(StringUtils.join(record, _delimiter));
- recordWriter.newLine();
+ private void writeRow(BufferedWriter recordWriter, GenericRow row, String[] fields)
+ throws IOException {
+ int numFields = fields.length;
+ String[] values = new String[numFields];
+ for (int i = 0; i < numFields; i++) {
+ String field = fields[i];
+ Object value = row.getValue(field);
+ if (value instanceof Object[]) {
+ values[i] = StringUtils.join((Object[]) value, _listDelimiter);
+ } else if (value instanceof byte[]) {
+ values[i] = BytesUtils.toHexString((byte[]) value);
+ } else {
+ values[i] = value.toString();
}
}
+ recordWriter.write(StringUtils.join(values, _delimiter));
+ recordWriter.newLine();
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
index ba36140..91c1213 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
@@ -22,9 +22,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import java.util.Map;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
/**
@@ -48,8 +50,14 @@ public class PinotSegmentToJsonConverter implements PinotSegmentConverter {
while (recordReader.hasNext()) {
row = recordReader.next(row);
ObjectNode record = JsonUtils.newObjectNode();
- for (String column : row.getFieldNames()) {
- record.set(column, JsonUtils.objectToJsonNode(row.getValue(column)));
+ for (Map.Entry<String, Object> entry : row.getFieldToValueMap().entrySet()) {
+ String field = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof byte[]) {
+ record.put(field, BytesUtils.toHexString((byte[]) value));
+ } else {
+ record.set(field, JsonUtils.objectToJsonNode(value));
+ }
}
recordWriter.write(record.toString());
recordWriter.newLine();
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
new file mode 100644
index 0000000..4788635
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.segment.converter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+
+public class PinotSegmentConverterTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotSegmentConverterTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ private static final String INT_SV_COLUMN = "intSVColumn";
+ private static final String LONG_SV_COLUMN = "longSVColumn";
+ private static final String FLOAT_SV_COLUMN = "floatSVColumn";
+ private static final String DOUBLE_SV_COLUMN = "doubleSVColumn";
+ private static final String STRING_SV_COLUMN = "stringSVColumn";
+ private static final String BYTES_SV_COLUMN = "bytesSVColumn";
+ private static final String INT_MV_COLUMN = "intMVColumn";
+ private static final String LONG_MV_COLUMN = "longMVColumn";
+ private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+ private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+ private static final String STRING_MV_COLUMN = "stringMVColumn";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_SV_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_SV_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_SV_COLUMN, DataType.DOUBLE)
+ .addSingleValueDimension(STRING_SV_COLUMN, DataType.STRING)
+ .addSingleValueDimension(BYTES_SV_COLUMN, DataType.BYTES).addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+ .addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG).addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
+ .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
+ .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING).
+ build();
+ private static final TableConfig TABLE_CONFIG =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private String _segmentDir;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+
+ GenericRow record = new GenericRow();
+ record.putValue(INT_SV_COLUMN, 1);
+ record.putValue(LONG_SV_COLUMN, 2L);
+ record.putValue(FLOAT_SV_COLUMN, 3.0f);
+ record.putValue(DOUBLE_SV_COLUMN, 4.0);
+ record.putValue(STRING_SV_COLUMN, "5");
+ record.putValue(BYTES_SV_COLUMN, new byte[]{6, 12, 34, 56});
+ record.putValue(INT_MV_COLUMN, new Object[]{7, 8});
+ record.putValue(LONG_MV_COLUMN, new Object[]{9L, 10L});
+ record.putValue(FLOAT_MV_COLUMN, new Object[]{11.0f, 12.0f});
+ record.putValue(DOUBLE_MV_COLUMN, new Object[]{13.0, 14.0});
+ record.putValue(STRING_MV_COLUMN, new Object[]{"15", "16"});
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(Collections.singletonList(record)));
+ driver.build();
+
+ _segmentDir = driver.getOutputDirectory().getPath();
+ }
+
+ @Test
+ public void testAvroConverter()
+ throws Exception {
+ File outputFile = new File(TEMP_DIR, "segment.avro");
+ PinotSegmentToAvroConverter avroConverter = new PinotSegmentToAvroConverter(_segmentDir, outputFile.getPath());
+ avroConverter.convert();
+
+ try (AvroRecordReader recordReader = new AvroRecordReader()) {
+ recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+ GenericRow record = recordReader.next();
+ assertEquals(record.getValue(INT_SV_COLUMN), 1);
+ assertEquals(record.getValue(LONG_SV_COLUMN), 2L);
+ assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0f);
+ assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+ assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+ assertEquals(record.getValue(BYTES_SV_COLUMN), new byte[]{6, 12, 34, 56});
+ assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+ assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9L, 10L});
+ assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0f, 12.0f});
+ assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0, 14.0});
+ assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+ assertFalse(recordReader.hasNext());
+ }
+ }
+
+ @Test
+ public void testCsvConverter()
+ throws Exception {
+ File outputFile = new File(TEMP_DIR, "segment.csv");
+ PinotSegmentToCsvConverter csvConverter =
+ new PinotSegmentToCsvConverter(_segmentDir, outputFile.getPath(), CSVRecordReaderConfig.DEFAULT_DELIMITER,
+ CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER, true);
+ csvConverter.convert();
+
+ try (CSVRecordReader recordReader = new CSVRecordReader()) {
+ recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+ GenericRow record = recordReader.next();
+ assertEquals(record.getValue(INT_SV_COLUMN), "1");
+ assertEquals(record.getValue(LONG_SV_COLUMN), "2");
+ assertEquals(record.getValue(FLOAT_SV_COLUMN), "3.0");
+ assertEquals(record.getValue(DOUBLE_SV_COLUMN), "4.0");
+ assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+ assertEquals(record.getValue(BYTES_SV_COLUMN), BytesUtils.toHexString(new byte[]{6, 12, 34, 56}));
+ assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{"7", "8"});
+ assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{"9", "10"});
+ assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{"11.0", "12.0"});
+ assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{"13.0", "14.0"});
+ assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+ assertFalse(recordReader.hasNext());
+ }
+ }
+
+ @Test
+ public void testJsonConverter()
+ throws Exception {
+ File outputFile = new File(TEMP_DIR, "segment.json");
+ PinotSegmentToJsonConverter jsonConverter = new PinotSegmentToJsonConverter(_segmentDir, outputFile.getPath());
+ jsonConverter.convert();
+
+ try (JSONRecordReader recordReader = new JSONRecordReader()) {
+ recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+ GenericRow record = recordReader.next();
+ assertEquals(record.getValue(INT_SV_COLUMN), 1);
+ assertEquals(record.getValue(LONG_SV_COLUMN), 2);
+ assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0);
+ assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+ assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+ assertEquals(record.getValue(BYTES_SV_COLUMN), BytesUtils.toHexString(new byte[]{6, 12, 34, 56}));
+ assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+ assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9, 10});
+ assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0, 12.0});
+ assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0, 14.0});
+ assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+ assertFalse(recordReader.hasNext());
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org