You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "DimitrisStaratzis (via GitHub)" <gi...@apache.org> on 2023/05/08 15:55:07 UTC

[GitHub] [iceberg] DimitrisStaratzis opened a new issue, #7556: Arrow reader fails to filter results

DimitrisStaratzis opened a new issue, #7556:
URL: https://github.com/apache/iceberg/issues/7556

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Other
   
   ### Please describe the bug 🐞
   
   Hi everyone, 
   
   I have created the following code that reproduces an issue that I am facing. I am trying to create a simple table using the JAVA API with one int column named: log_level. In my implementation the column can take values randomly from 0 to 4. The data file format I am using is parquet. The ```batches``` variable determines how many files will be written. There is also the ```recordsPerBatch``` which determines the number of records per batch. To read back my data I use the ```VectorizedTableScanIterable``` object which is just a wrapper of the apache iceberg ```ArrowReader```. Now the problem is that when I run the following code, and I apply a simple filter, in this case ```log_level > 2``` the results come back unfiltered. However if I change:
   ```builder.add(record.copy(ImmutableMap.of("log_level", rand.nextInt(5))));``` to 
   ```builder.add(record.copy(ImmutableMap.of("log_level", j % 5)));``` which will essentially store the same value in each file, because ```j``` is the current file (batch) the filtering works. So essentially, in my understanding, filtering only works on files rather than individual rows. Is this the expected behavior or am I missing something? Also, when I use the commented part of the code below, filtering works row by row as I would expect. 
   
   here is the example I have written to reproduce the issue:
   
   ```package org.example;
   
   import com.google.common.collect.ImmutableList;
   import com.google.common.collect.ImmutableMap;
   import org.apache.commons.io.FileUtils;
   import org.apache.iceberg.DataFile;
   import org.apache.iceberg.PartitionSpec;
   import org.apache.iceberg.Schema;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.TableScan;
   import org.apache.iceberg.arrow.vectorized.ColumnVector;
   import org.apache.iceberg.arrow.vectorized.ColumnarBatch;
   import org.apache.iceberg.arrow.vectorized.VectorizedTableScanIterable;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.data.GenericRecord;
   import org.apache.iceberg.data.IcebergGenerics;
   import org.apache.iceberg.data.Record;
   import org.apache.iceberg.data.parquet.GenericParquetWriter;
   import org.apache.iceberg.expressions.Expressions;
   import org.apache.iceberg.hadoop.HadoopCatalog;
   import org.apache.iceberg.io.CloseableIterable;
   import org.apache.iceberg.io.DataWriter;
   import org.apache.iceberg.io.OutputFile;
   import org.apache.iceberg.parquet.Parquet;
   import org.apache.iceberg.types.Types;
   import org.apache.hadoop.conf.Configuration;
   
   import java.io.File;
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Random;
   
   public class Example {
       private static final int batches = 5;
       private static final int recordsPerBatch = 20;
   
       private static final String tableName = "iceberg_example";
   
       public static void main(String[] args) throws Exception {
           deleteIfExists();
           Table table = createLogArray();
           filteredScan(table);
   
       }
   
       public static Table createLogArray() throws Exception {
           Configuration conf = new Configuration();
           conf.set("fs.defaultFS", "file:///");
           String warehousePath = ".";
           HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
   
           Schema schema = new Schema(
                   Types.NestedField.required(1, "log_level", Types.IntegerType.get())
           );
   
           PartitionSpec spec = PartitionSpec.builderFor(schema).build();
   
           // Create a table with the given schema
           TableIdentifier name = TableIdentifier.of(tableName);
           Map<String, String> properties = ImmutableMap.of("write.parquet.page-size-bytes", 1000 + ""); // set the page size to 1MB
   
           Table table = catalog.createTable(name, schema, spec, properties);
   
           // write data in batches
           Random rand = new Random();
           for (int j = 0; j < batches; j++) {
               // generate data
               GenericRecord record = GenericRecord.create(schema);
               ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
               for (int i = 0; i < recordsPerBatch; i++) {
                   // create a random record
                   builder.add(record.copy(ImmutableMap.of("log_level", rand.nextInt(5))));
               }
   
               String filepath = table.location() + "/" + "file_" + j + ".parquet"; //uuid not necessary
               OutputFile file = table.io().newOutputFile(filepath);
               DataWriter<GenericRecord> dataWriter =
                       Parquet.writeData(file)
                               .schema(schema)
                               .createWriterFunc(GenericParquetWriter::buildWriter)
                               .overwrite()
                               .withSpec(PartitionSpec.unpartitioned())
                               .build();
   
               try {
                   for (GenericRecord rec : builder.build()) {
                       dataWriter.write(rec);
                   }
               } finally {
                   dataWriter.close();
               }
   
               DataFile dataFile = dataWriter.toDataFile();
   
               table.newAppend().appendFile(dataFile).commit();
           }
           catalog.close();
           return table;
       }
   
   
       public static void filteredScan(Table table) throws Exception {
   
           TableScan scan = table.newScan().select("log_level").filter(Expressions.greaterThan("log_level", 2));
           VectorizedTableScanIterable vectorizedTableScanIterable = new VectorizedTableScanIterable(scan, (recordsPerBatch * batches), true);
           Iterator<ColumnarBatch> iterator = vectorizedTableScanIterable.iterator();
           while (iterator.hasNext()) {
               ColumnarBatch batch = iterator.next();
               ColumnVector att = batch.column(0);
               for (int i = 0; i < batch.numRows(); i++) {
                   int value = att.getInt(i);
                   System.out.println(value);
               }
           }
   //        CloseableIterable<Record> result = IcebergGenerics.read(table)
   //                .select("log_level")
   //                .where(Expressions.greaterThan("log_level", 2))
   //                .build();
   //        for (Record record : result) {
   //            System.out.println(record.get(0));
   //        }
   //        result.close();
       }
   
       public static void deleteIfExists() throws Exception {
           File directory = new File(tableName);
           if (directory.exists()) {
               if (directory.isDirectory()) {
                   try {
                       FileUtils.deleteDirectory(directory);
                       System.out.println("Directory deleted successfully.");
                   } catch (Exception e) {
                       System.out.println("Failed to delete directory: " + e.getMessage());
                   }
               } else {
                   System.out.println("File with name " + tableName + " exists but is not a directory.");
               }
           } else {
               System.out.println("Directory with name " + tableName + " does not exist.");
           }
       }
   }
   ```
   
   Thank you in advance!
   
   Best, 
   Dimitris


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] DimitrisStaratzis commented on issue #7556: Arrow reader fails to filter results

Posted by "DimitrisStaratzis (via GitHub)" <gi...@apache.org>.
DimitrisStaratzis commented on issue #7556:
URL: https://github.com/apache/iceberg/issues/7556#issuecomment-1549475096

   Any updates on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] DimitrisStaratzis commented on issue #7556: Arrow reader fails to filter results

Posted by "DimitrisStaratzis (via GitHub)" <gi...@apache.org>.
DimitrisStaratzis commented on issue #7556:
URL: https://github.com/apache/iceberg/issues/7556#issuecomment-1538644996

   I believe this issue could be related: https://github.com/apache/iceberg/issues/7022
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Arrow reader fails to filter results [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7556:
URL: https://github.com/apache/iceberg/issues/7556#issuecomment-1826951581

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Arrow reader fails to filter results [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #7556: Arrow reader fails to filter results
URL: https://github.com/apache/iceberg/issues/7556


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Arrow reader fails to filter results [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7556:
URL: https://github.com/apache/iceberg/issues/7556#issuecomment-1807293720

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org