You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2020/08/28 17:52:19 UTC

[incubator-pinot] branch release-0.5.0-rc updated: Fix CSV and JSON converter on BYTES column (#5931)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch release-0.5.0-rc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/release-0.5.0-rc by this push:
     new b0d8da0  Fix CSV and JSON converter on BYTES column (#5931)
b0d8da0 is described below

commit b0d8da0e12c0e2fdbaff5da883e92f46ac7b47ea
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Aug 26 15:37:49 2020 -0700

    Fix CSV and JSON converter on BYTES column (#5931)
    
    - Fix the issue of PinotSegmentToCsvConverter and PinotSegmentToJsonConverter not handling BYTES column properly
    - Add test to test converter for all data types
---
 .../converter/PinotSegmentToAvroConverter.java     |  13 +-
 .../converter/PinotSegmentToCsvConverter.java      |  47 ++---
 .../converter/PinotSegmentToJsonConverter.java     |  16 +-
 .../converter/PinotSegmentConverterTest.java       | 195 +++++++++++++++++++++
 4 files changed, 239 insertions(+), 32 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
index 846302d..d723af8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
@@ -21,6 +21,7 @@ package org.apache.pinot.tools.segment.converter;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
@@ -55,16 +56,14 @@ public class PinotSegmentToAvroConverter implements PinotSegmentConverter {
         while (pinotSegmentRecordReader.hasNext()) {
           row = pinotSegmentRecordReader.next(row);
           Record record = new Record(avroSchema);
-
-          for (String field : row.getFieldNames()) {
-            Object value = row.getValue(field);
+          for (Map.Entry<String, Object> entry : row.getFieldToValueMap().entrySet()) {
+            String field = entry.getKey();
+            Object value = entry.getValue();
             if (value instanceof Object[]) {
               record.put(field, Arrays.asList((Object[]) value));
+            } else if (value instanceof byte[]) {
+              record.put(field, ByteBuffer.wrap((byte[]) value));
             } else {
-              // PinotSegmentRecordReader returns byte[] instead of ByteBuffer.
-              if (value instanceof byte[]) {
-                value = ByteBuffer.wrap((byte[]) value);
-              }
               record.put(field, value);
             }
           }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
index 7354194..899f7fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
@@ -21,11 +21,11 @@ package org.apache.pinot.tools.segment.converter;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.IOException;
 import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
@@ -53,31 +53,36 @@ public class PinotSegmentToCsvConverter implements PinotSegmentConverter {
     try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(new File(_segmentDir));
         BufferedWriter recordWriter = new BufferedWriter(new FileWriter(_outputFile))) {
       GenericRow row = new GenericRow();
-
+      row = recordReader.next(row);
+      String[] fields = row.getFieldToValueMap().keySet().toArray(new String[0]);
       if (_withHeader) {
-        row = recordReader.next(row);
-        recordWriter.write(StringUtils.join(row.getFieldNames(), _delimiter));
+        recordWriter.write(StringUtils.join(fields, _delimiter));
         recordWriter.newLine();
-        recordReader.rewind();
       }
-
+      writeRow(recordWriter, row, fields);
       while (recordReader.hasNext()) {
         row = recordReader.next(row);
-        String[] fields = row.getFieldNames();
-        List<String> record = new ArrayList<>(fields.length);
-
-        for (String field : fields) {
-          Object value = row.getValue(field);
-          if (value instanceof Object[]) {
-            record.add(StringUtils.join((Object[]) value, _listDelimiter));
-          } else {
-            record.add(value.toString());
-          }
-        }
+        writeRow(recordWriter, row, fields);
+      }
+    }
+  }
 
-        recordWriter.write(StringUtils.join(record, _delimiter));
-        recordWriter.newLine();
+  private void writeRow(BufferedWriter recordWriter, GenericRow row, String[] fields)
+      throws IOException {
+    int numFields = fields.length;
+    String[] values = new String[numFields];
+    for (int i = 0; i < numFields; i++) {
+      String field = fields[i];
+      Object value = row.getValue(field);
+      if (value instanceof Object[]) {
+        values[i] = StringUtils.join((Object[]) value, _listDelimiter);
+      } else if (value instanceof byte[]) {
+        values[i] = BytesUtils.toHexString((byte[]) value);
+      } else {
+        values[i] = value.toString();
       }
     }
+    recordWriter.write(StringUtils.join(values, _delimiter));
+    recordWriter.newLine();
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
index ba36140..91c1213 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
@@ -22,9 +22,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import java.util.Map;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 
 /**
@@ -48,8 +50,14 @@ public class PinotSegmentToJsonConverter implements PinotSegmentConverter {
       while (recordReader.hasNext()) {
         row = recordReader.next(row);
         ObjectNode record = JsonUtils.newObjectNode();
-        for (String column : row.getFieldNames()) {
-          record.set(column, JsonUtils.objectToJsonNode(row.getValue(column)));
+        for (Map.Entry<String, Object> entry : row.getFieldToValueMap().entrySet()) {
+          String field = entry.getKey();
+          Object value = entry.getValue();
+          if (value instanceof byte[]) {
+            record.put(field, BytesUtils.toHexString((byte[]) value));
+          } else {
+            record.set(field, JsonUtils.objectToJsonNode(value));
+          }
         }
         recordWriter.write(record.toString());
         recordWriter.newLine();
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
new file mode 100644
index 0000000..4788635
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.segment.converter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+
+public class PinotSegmentConverterTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotSegmentConverterTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final String INT_SV_COLUMN = "intSVColumn";
+  private static final String LONG_SV_COLUMN = "longSVColumn";
+  private static final String FLOAT_SV_COLUMN = "floatSVColumn";
+  private static final String DOUBLE_SV_COLUMN = "doubleSVColumn";
+  private static final String STRING_SV_COLUMN = "stringSVColumn";
+  private static final String BYTES_SV_COLUMN = "bytesSVColumn";
+  private static final String INT_MV_COLUMN = "intMVColumn";
+  private static final String LONG_MV_COLUMN = "longMVColumn";
+  private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+  private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+  private static final String STRING_MV_COLUMN = "stringMVColumn";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_SV_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_SV_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_SV_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(STRING_SV_COLUMN, DataType.STRING)
+      .addSingleValueDimension(BYTES_SV_COLUMN, DataType.BYTES).addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+      .addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG).addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
+      .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
+      .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING).
+          build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private String _segmentDir;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+
+    GenericRow record = new GenericRow();
+    record.putValue(INT_SV_COLUMN, 1);
+    record.putValue(LONG_SV_COLUMN, 2L);
+    record.putValue(FLOAT_SV_COLUMN, 3.0f);
+    record.putValue(DOUBLE_SV_COLUMN, 4.0);
+    record.putValue(STRING_SV_COLUMN, "5");
+    record.putValue(BYTES_SV_COLUMN, new byte[]{6, 12, 34, 56});
+    record.putValue(INT_MV_COLUMN, new Object[]{7, 8});
+    record.putValue(LONG_MV_COLUMN, new Object[]{9L, 10L});
+    record.putValue(FLOAT_MV_COLUMN, new Object[]{11.0f, 12.0f});
+    record.putValue(DOUBLE_MV_COLUMN, new Object[]{13.0, 14.0});
+    record.putValue(STRING_MV_COLUMN, new Object[]{"15", "16"});
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(Collections.singletonList(record)));
+    driver.build();
+
+    _segmentDir = driver.getOutputDirectory().getPath();
+  }
+
+  @Test
+  public void testAvroConverter()
+      throws Exception {
+    File outputFile = new File(TEMP_DIR, "segment.avro");
+    PinotSegmentToAvroConverter avroConverter = new PinotSegmentToAvroConverter(_segmentDir, outputFile.getPath());
+    avroConverter.convert();
+
+    try (AvroRecordReader recordReader = new AvroRecordReader()) {
+      recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+      GenericRow record = recordReader.next();
+      assertEquals(record.getValue(INT_SV_COLUMN), 1);
+      assertEquals(record.getValue(LONG_SV_COLUMN), 2L);
+      assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0f);
+      assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+      assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+      assertEquals(record.getValue(BYTES_SV_COLUMN), new byte[]{6, 12, 34, 56});
+      assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+      assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9L, 10L});
+      assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0f, 12.0f});
+      assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0, 14.0});
+      assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+      assertFalse(recordReader.hasNext());
+    }
+  }
+
+  @Test
+  public void testCsvConverter()
+      throws Exception {
+    File outputFile = new File(TEMP_DIR, "segment.csv");
+    PinotSegmentToCsvConverter csvConverter =
+        new PinotSegmentToCsvConverter(_segmentDir, outputFile.getPath(), CSVRecordReaderConfig.DEFAULT_DELIMITER,
+            CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER, true);
+    csvConverter.convert();
+
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+      GenericRow record = recordReader.next();
+      assertEquals(record.getValue(INT_SV_COLUMN), "1");
+      assertEquals(record.getValue(LONG_SV_COLUMN), "2");
+      assertEquals(record.getValue(FLOAT_SV_COLUMN), "3.0");
+      assertEquals(record.getValue(DOUBLE_SV_COLUMN), "4.0");
+      assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+      assertEquals(record.getValue(BYTES_SV_COLUMN), BytesUtils.toHexString(new byte[]{6, 12, 34, 56}));
+      assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{"7", "8"});
+      assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{"9", "10"});
+      assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{"11.0", "12.0"});
+      assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{"13.0", "14.0"});
+      assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+      assertFalse(recordReader.hasNext());
+    }
+  }
+
+  @Test
+  public void testJsonConverter()
+      throws Exception {
+    File outputFile = new File(TEMP_DIR, "segment.json");
+    PinotSegmentToJsonConverter jsonConverter = new PinotSegmentToJsonConverter(_segmentDir, outputFile.getPath());
+    jsonConverter.convert();
+
+    try (JSONRecordReader recordReader = new JSONRecordReader()) {
+      recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+      GenericRow record = recordReader.next();
+      assertEquals(record.getValue(INT_SV_COLUMN), 1);
+      assertEquals(record.getValue(LONG_SV_COLUMN), 2);
+      assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0);
+      assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+      assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+      assertEquals(record.getValue(BYTES_SV_COLUMN), BytesUtils.toHexString(new byte[]{6, 12, 34, 56}));
+      assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+      assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9, 10});
+      assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0, 12.0});
+      assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0, 14.0});
+      assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", "16"});
+
+      assertFalse(recordReader.hasNext());
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org