You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/09 21:15:24 UTC

[GitHub] [pinot] jackjlli commented on a diff in pull request #9951: Improving gz support for avro record readers

jackjlli commented on code in PR #9951:
URL: https://github.com/apache/pinot/pull/9951#discussion_r1044846189


##########
pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java:
##########
@@ -32,116 +32,79 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 
 /**
  * Test {@code org.apache.pinot.plugin.inputformat.thrift.data.ThriftRecordReader} for a given sample thrift
  * data.
  */
-public class ThriftRecordReaderTest {
+public class ThriftRecordReaderTest extends AbstractRecordReaderTest {
   private static final String THRIFT_DATA = "_test_sample_thrift_data.thrift";
 
-  private File _tempFile;
+  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
+    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
+    config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData");
+    return config;
+  }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    FileUtils.deleteQuietly(_tempFile);
-
-    ThriftSampleData t1 = new ThriftSampleData();
-    t1.setActive(true);
-    t1.setCreated_at(1515541280L);
-    t1.setId(1);
-    t1.setName("name1");
-    List<Short> t1Groups = new ArrayList<>(2);
-    t1Groups.add((short) 1);
-    t1Groups.add((short) 4);
-    t1.setGroups(t1Groups);
-    Map<String, Long> mapValues = new HashMap<>();
-    mapValues.put("name1", 1L);
-    t1.setMap_values(mapValues);
-    Set<String> namesSet = new HashSet<>();
-    namesSet.add("name1");
-    t1.setSet_values(namesSet);
-
-    ThriftSampleData t2 = new ThriftSampleData();
-    t2.setActive(false);
-    t2.setCreated_at(1515541290L);
-    t2.setId(2);
-    t2.setName("name2");
-    List<Short> t2Groups = new ArrayList<>(2);
-    t2Groups.add((short) 2);
-    t2Groups.add((short) 3);
-    t2.setGroups(t2Groups);
-    List<ThriftSampleData> lists = new ArrayList<>(2);
-    lists.add(t1);
-    lists.add(t2);
-    TSerializer binarySerializer = new TSerializer(new TBinaryProtocol.Factory());
-    _tempFile = getSampleDataPath();
-    FileWriter writer = new FileWriter(_tempFile);
-    for (ThriftSampleData d : lists) {
-      IOUtils.write(binarySerializer.serialize(d), writer);
-    }
-    writer.close();
-  }
-
-  @Test
-  public void testReadData()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 2, "The number of rows return is incorrect");
-    int id = 1;
-    for (GenericRow outputRow : genericRows) {
-      Assert.assertEquals(outputRow.getValue("id"), id);
-      Assert.assertNull(outputRow.getValue("map_values"));
-      id++;
+    if (_tempDir.exists()) {
+      FileUtils.cleanDirectory(_tempDir);
     }
+    FileUtils.forceMkdir(_tempDir);
+    // Generate Pinot schema
+    _pinotSchema = getPinotSchema();
+    _sourceFields = getSourceFields(_pinotSchema);
+    // Generate random records based on Pinot schema
+    _records = generateRandomRecords();
+    _primaryKeys = generatePrimaryKeys(_records, getPrimaryKeyColumns());
+    // Write generated random records to file
+    writeRecordsToFile(_records);
+    // Create and init RecordReader
+    _recordReader = createRecordReader();
   }
 
-  @Test
-  public void testRewind()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-
-    recordReader.rewind();
-
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 4, "The number of rows return after the rewind is incorrect");
+  private List<Map<String, Object>> generateRandomRecords() {

Review Comment:
   I saw there is a method called `protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema)` from the abstract test class, you may want to override that class here?



##########
pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java:
##########
@@ -188,17 +206,52 @@ public void testRecordReader()
     checkValue(_recordReader, _records, _primaryKeys);
   }
 
+  @Test
+  public void testGzipRecordReader()
+      throws Exception {
+    // Test Gzipped Avro file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile.getAbsolutePath());
+    RecordReader recordReader = createRecordReader(gzipDataFile);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+
+    // Test Gzipped Avro file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile2.getAbsolutePath());
+    recordReader = createRecordReader(gzipDataFile2);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+  }
+
   /**
-   * @return an implementation of RecordReader
+   * @return an implementation of RecordReader of the given file

Review Comment:
   nit: add a description for the `file` param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org