You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/26 16:42:51 UTC
[incubator-iceberg] branch master updated: Fix date and timestamp
filters with generics (#983)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0ee6f12 Fix date and timestamp filters with generics (#983)
0ee6f12 is described below
commit 0ee6f120a475ee3a7026e689886195d4a9acbf51
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Wed May 27 00:42:36 2020 +0800
Fix date and timestamp filters with generics (#983)
Date and timestamp values were not the correct type for Evaluator.
---
.../apache/iceberg/data/InternalRecordWrapper.java | 88 ++++++++++++++++++++++
.../org/apache/iceberg/data/TableScanIterable.java | 5 +-
.../org/apache/iceberg/data/TestLocalScan.java | 48 +++++++++++-
3 files changed, 137 insertions(+), 4 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java
new file mode 100644
index 0000000..c9efb3d
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Function;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class InternalRecordWrapper implements StructLike {
+ private final Function<Object, Object>[] transforms;
+ private StructLike wrapped = null;
+
+ @SuppressWarnings("unchecked")
+ InternalRecordWrapper(Types.StructType struct) {
+ this.transforms = struct.fields().stream()
+ .map(field -> converter(field.type()))
+ .toArray(length -> (Function<Object, Object>[]) Array.newInstance(Function.class, length));
+ }
+
+ private static Function<Object, Object> converter(Type type) {
+ switch (type.typeId()) {
+ case DATE:
+ return date -> DateTimeUtil.daysFromDate((LocalDate) date);
+ case TIME:
+ return time -> DateTimeUtil.microsFromTime((LocalTime) time);
+ case TIMESTAMP:
+ if (((Types.TimestampType) type).shouldAdjustToUTC()) {
+ return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
+ } else {
+ return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp);
+ }
+ case FIXED:
+ return bytes -> ByteBuffer.wrap((byte[]) bytes);
+ case STRUCT:
+ InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType());
+ return struct -> wrapper.wrap((StructLike) struct);
+ default:
+ }
+ return null;
+ }
+
+ public InternalRecordWrapper wrap(StructLike record) {
+ this.wrapped = record;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return wrapped.size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ if (transforms[pos] != null) {
+ return javaClass.cast(transforms[pos].apply(wrapped.get(pos, Object.class)));
+ }
+ return wrapped.get(pos, javaClass);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Cannot update InternalRecordWrapper");
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
index 50152ed..c48d0d3 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
@@ -135,11 +135,13 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
private final boolean caseSensitive;
private Closeable currentCloseable = null;
private Iterator<Record> currentIterator = Collections.emptyIterator();
+ private final InternalRecordWrapper recordWrapper;
private ScanIterator(CloseableIterable<CombinedScanTask> tasks, boolean caseSensitive) {
this.tasks = Lists.newArrayList(Iterables.concat(
CloseableIterable.transform(tasks, CombinedScanTask::files))).iterator();
this.caseSensitive = caseSensitive;
+ this.recordWrapper = new InternalRecordWrapper(projection.asStruct());
}
@Override
@@ -163,7 +165,8 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
if (task.residual() != null && task.residual() != Expressions.alwaysTrue()) {
Evaluator filter = new Evaluator(projection.asStruct(), task.residual(), caseSensitive);
- this.currentIterator = Iterables.filter(reader, filter::eval).iterator();
+ this.currentIterator = Iterables.filter(reader,
+ record -> filter.eval(recordWrapper.wrap(record))).iterator();
} else {
this.currentIterator = reader.iterator();
}
diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
index 6736730..b13a3df 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -67,6 +68,7 @@ import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static org.apache.iceberg.DataFiles.fromInputFile;
+import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
@@ -391,13 +393,17 @@ public class TestLocalScan {
}
private DataFile writeFile(String location, String filename, List<Record> records) throws IOException {
+ return writeFile(location, filename, SCHEMA, records);
+ }
+
+ private DataFile writeFile(String location, String filename, Schema schema, List<Record> records) throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
switch (fileFormat) {
case AVRO:
FileAppender<Record> avroAppender = Avro.write(fromPath(path, CONF))
- .schema(SCHEMA)
+ .schema(schema)
.createWriterFunc(DataWriter::create)
.named(fileFormat.name())
.build();
@@ -414,7 +420,7 @@ public class TestLocalScan {
case PARQUET:
FileAppender<Record> parquetAppender = Parquet.write(fromPath(path, CONF))
- .schema(SCHEMA)
+ .schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.build();
try {
@@ -430,7 +436,7 @@ public class TestLocalScan {
case ORC:
FileAppender<Record> orcAppender = ORC.write(fromPath(path, CONF))
- .schema(SCHEMA)
+ .schema(schema)
.createWriterFunc(GenericOrcWriter::buildWriter)
.build();
try {
@@ -449,6 +455,42 @@ public class TestLocalScan {
}
}
+ @Test
+ public void testFilterWithDateAndTimestamp() throws IOException {
+ Assume.assumeFalse(format == FileFormat.ORC);
+ Schema schema = new Schema(
+ required(1, "timestamp_with_zone", Types.TimestampType.withZone()),
+ required(2, "timestamp_without_zone", Types.TimestampType.withoutZone()),
+ required(3, "date", Types.DateType.get()),
+ required(4, "time", Types.TimeType.get())
+ );
+
+ File tableLocation = temp.newFolder("complex_filter_table");
+ Assert.assertTrue(tableLocation.delete());
+
+ Table table = TABLES.create(
+ schema, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ tableLocation.getAbsolutePath());
+
+ List<Record> expected = RandomGenericData.generate(schema, 100, 435691832918L);
+ DataFile file = writeFile(tableLocation.toString(), format.addExtension("record-file"), schema, expected);
+ table.newFastAppend().appendFile(file).commit();
+
+ for (Record r : expected) {
+ Iterable<Record> filterResult = IcebergGenerics.read(table)
+ .where(equal("timestamp_with_zone", r.getField("timestamp_with_zone").toString()))
+ .where(equal("timestamp_without_zone", r.getField("timestamp_without_zone").toString()))
+ .where(equal("date", r.getField("date").toString()))
+ .where(equal("time", r.getField("time").toString()))
+ .build();
+
+ Assert.assertTrue(filterResult.iterator().hasNext());
+ Record readRecord = filterResult.iterator().next();
+ Assert.assertEquals(r.getField("timestamp_with_zone"), readRecord.getField("timestamp_with_zone"));
+ }
+ }
+
private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}