You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/26 16:42:51 UTC

[incubator-iceberg] branch master updated: Fix date and timestamp filters with generics (#983)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ee6f12  Fix date and timestamp filters with generics (#983)
0ee6f12 is described below

commit 0ee6f120a475ee3a7026e689886195d4a9acbf51
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Wed May 27 00:42:36 2020 +0800

    Fix date and timestamp filters with generics (#983)
    
    Date and timestamp values were not the correct type for Evaluator.
---
 .../apache/iceberg/data/InternalRecordWrapper.java | 88 ++++++++++++++++++++++
 .../org/apache/iceberg/data/TableScanIterable.java |  5 +-
 .../org/apache/iceberg/data/TestLocalScan.java     | 48 +++++++++++-
 3 files changed, 137 insertions(+), 4 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java
new file mode 100644
index 0000000..c9efb3d
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Function;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class InternalRecordWrapper implements StructLike {
+  private final Function<Object, Object>[] transforms;
+  private StructLike wrapped = null;
+
+  @SuppressWarnings("unchecked")
+  InternalRecordWrapper(Types.StructType struct) {
+    this.transforms = struct.fields().stream()
+        .map(field -> converter(field.type()))
+        .toArray(length -> (Function<Object, Object>[]) Array.newInstance(Function.class, length));
+  }
+
+  private static Function<Object, Object> converter(Type type) {
+    switch (type.typeId()) {
+      case DATE:
+        return date -> DateTimeUtil.daysFromDate((LocalDate) date);
+      case TIME:
+        return time -> DateTimeUtil.microsFromTime((LocalTime) time);
+      case TIMESTAMP:
+        if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+          return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
+        } else {
+          return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp);
+        }
+      case FIXED:
+        return bytes -> ByteBuffer.wrap((byte[]) bytes);
+      case STRUCT:
+        InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType());
+        return struct -> wrapper.wrap((StructLike) struct);
+      default:
+    }
+    return null;
+  }
+
+  public InternalRecordWrapper wrap(StructLike record) {
+    this.wrapped = record;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return wrapped.size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (transforms[pos] != null) {
+      return javaClass.cast(transforms[pos].apply(wrapped.get(pos, Object.class)));
+    }
+    return wrapped.get(pos, javaClass);
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    throw new UnsupportedOperationException("Cannot update InternalRecordWrapper");
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 50152ed..c48d0d3 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -135,11 +135,13 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
     private final boolean caseSensitive;
     private Closeable currentCloseable = null;
     private Iterator<Record> currentIterator = Collections.emptyIterator();
+    private final InternalRecordWrapper recordWrapper;
 
     private ScanIterator(CloseableIterable<CombinedScanTask> tasks, boolean caseSensitive) {
       this.tasks = Lists.newArrayList(Iterables.concat(
           CloseableIterable.transform(tasks, CombinedScanTask::files))).iterator();
       this.caseSensitive = caseSensitive;
+      this.recordWrapper = new InternalRecordWrapper(projection.asStruct());
     }
 
     @Override
@@ -163,7 +165,8 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
 
           if (task.residual() != null && task.residual() != Expressions.alwaysTrue()) {
             Evaluator filter = new Evaluator(projection.asStruct(), task.residual(), caseSensitive);
-            this.currentIterator = Iterables.filter(reader, filter::eval).iterator();
+            this.currentIterator = Iterables.filter(reader,
+                record -> filter.eval(recordWrapper.wrap(record))).iterator();
           } else {
             this.currentIterator = reader.iterator();
           }
diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
index 6736730..b13a3df 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -67,6 +68,7 @@ import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
 import static org.apache.iceberg.DataFiles.fromInputFile;
+import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.apache.iceberg.expressions.Expressions.lessThan;
 import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
 import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
@@ -391,13 +393,17 @@ public class TestLocalScan {
   }
 
   private DataFile writeFile(String location, String filename, List<Record> records) throws IOException {
+    return writeFile(location, filename, SCHEMA, records);
+  }
+
+  private DataFile writeFile(String location, String filename, Schema schema, List<Record> records) throws IOException {
     Path path = new Path(location, filename);
     FileFormat fileFormat = FileFormat.fromFileName(filename);
     Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
     switch (fileFormat) {
       case AVRO:
         FileAppender<Record> avroAppender = Avro.write(fromPath(path, CONF))
-            .schema(SCHEMA)
+            .schema(schema)
             .createWriterFunc(DataWriter::create)
             .named(fileFormat.name())
             .build();
@@ -414,7 +420,7 @@ public class TestLocalScan {
 
       case PARQUET:
         FileAppender<Record> parquetAppender = Parquet.write(fromPath(path, CONF))
-            .schema(SCHEMA)
+            .schema(schema)
             .createWriterFunc(GenericParquetWriter::buildWriter)
             .build();
         try {
@@ -430,7 +436,7 @@ public class TestLocalScan {
 
       case ORC:
         FileAppender<Record> orcAppender = ORC.write(fromPath(path, CONF))
-            .schema(SCHEMA)
+            .schema(schema)
             .createWriterFunc(GenericOrcWriter::buildWriter)
             .build();
         try {
@@ -449,6 +455,42 @@ public class TestLocalScan {
     }
   }
 
+  @Test
+  public void testFilterWithDateAndTimestamp() throws IOException {
+    Assume.assumeFalse(format == FileFormat.ORC);
+    Schema schema = new Schema(
+        required(1, "timestamp_with_zone", Types.TimestampType.withZone()),
+        required(2, "timestamp_without_zone", Types.TimestampType.withoutZone()),
+        required(3, "date", Types.DateType.get()),
+        required(4, "time", Types.TimeType.get())
+    );
+
+    File tableLocation = temp.newFolder("complex_filter_table");
+    Assert.assertTrue(tableLocation.delete());
+
+    Table table = TABLES.create(
+        schema, PartitionSpec.unpartitioned(),
+        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+        tableLocation.getAbsolutePath());
+
+    List<Record> expected = RandomGenericData.generate(schema, 100, 435691832918L);
+    DataFile file = writeFile(tableLocation.toString(), format.addExtension("record-file"), schema, expected);
+    table.newFastAppend().appendFile(file).commit();
+
+    for (Record r : expected) {
+      Iterable<Record> filterResult = IcebergGenerics.read(table)
+          .where(equal("timestamp_with_zone", r.getField("timestamp_with_zone").toString()))
+          .where(equal("timestamp_without_zone", r.getField("timestamp_without_zone").toString()))
+          .where(equal("date", r.getField("date").toString()))
+          .where(equal("time", r.getField("time").toString()))
+          .build();
+
+      Assert.assertTrue(filterResult.iterator().hasNext());
+      Record readRecord = filterResult.iterator().next();
+      Assert.assertEquals(r.getField("timestamp_with_zone"), readRecord.getField("timestamp_with_zone"));
+    }
+  }
+
   private static ByteBuffer longToBuffer(long value) {
     return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
   }