You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:29 UTC
[iceberg] 14/18: Hive: Fix predicate pushdown for Date (#2254)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 1f02595fb408b8ac12c83a758fa6fe9705f5ce8b
Author: pvary <pv...@cloudera.com>
AuthorDate: Fri Feb 26 12:51:10 2021 +0100
Hive: Fix predicate pushdown for Date (#2254)
---
.../iceberg/mr/hive/HiveIcebergFilterFactory.java | 16 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 6 +-
.../mr/hive/TestHiveIcebergFilterFactory.java | 2 +-
.../TestHiveIcebergStorageHandlerTimezone.java | 172 +++++++++++++++++++++
.../TestHiveIcebergStorageHandlerWithEngine.java | 6 +-
5 files changed, 191 insertions(+), 11 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 33791c7..a645874 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.mr.hive;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
-import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
@@ -172,15 +171,24 @@ public class HiveIcebergFilterFactory {
return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
}
+ // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
+ // Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef()
+ // To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses
+ // the year/month/day to generate the object
private static int daysFromDate(Date date) {
- return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime()));
+ return DateTimeUtil.daysFromDate(date.toLocalDate());
}
+ // Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp
+ // Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef()
+ // To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery
+ // as it uses the year/month/day/hour/min/sec/nanos to generate the object
private static int daysFromTimestamp(Timestamp timestamp) {
- return DateTimeUtil.daysFromInstant(timestamp.toInstant());
+ return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate());
}
+ // We have to use the LocalDateTime to get the micros. See the comment above.
private static long microsFromTimestamp(Timestamp timestamp) {
- return DateTimeUtil.microsFromInstant(timestamp.toInstant());
+ return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime());
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 033e507..a67c513 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -284,8 +285,11 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
+ // Date and timestamp values are not the correct type for Evaluator.
+ // Wrapping to return the expected type.
+ InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
- return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
+ return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
} else {
return iter;
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
index 3d436dc..3044f04 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
@@ -218,7 +218,7 @@ public class TestHiveIcebergFilterFactory {
@Test
public void testDateType() {
SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
- Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
+ Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12));
SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build();
UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value());
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
new file mode 100644
index 0000000..ea6b4d1
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.runners.Parameterized.Parameter;
+import static org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerTimezone {
+ private static final Optional<ThreadLocal<DateFormat>> dateFormat =
+ Optional.ofNullable((ThreadLocal<DateFormat>) DynFields.builder()
+ .hiddenImpl(TimestampWritable.class, "threadLocalDateFormat")
+ .defaultAlwaysNull()
+ .buildStatic()
+ .get());
+
+ private static final Optional<ThreadLocal<TimeZone>> localTimeZone =
+ Optional.ofNullable((ThreadLocal<TimeZone>) DynFields.builder()
+ .hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE")
+ .defaultAlwaysNull()
+ .buildStatic()
+ .get());
+
+ @Parameters(name = "timezone={0}")
+ public static Collection<Object[]> parameters() {
+ return ImmutableList.of(
+ new String[] {"America/New_York"},
+ new String[] {"Asia/Kolkata"},
+ new String[] {"UTC/Greenwich"}
+ );
+ }
+
+ private static TestHiveShell shell;
+
+ private TestTables testTables;
+
+ @Parameter(0)
+ public String timezoneString;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @BeforeClass
+ public static void beforeClass() {
+ shell = HiveIcebergStorageHandlerTestUtils.shell();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ shell.stop();
+ }
+
+ @Before
+ public void before() throws IOException {
+ TimeZone.setDefault(TimeZone.getTimeZone(timezoneString));
+
+ // Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the
+ // cached object
+ dateFormat.ifPresent(ThreadLocal::remove);
+ localTimeZone.ifPresent(ThreadLocal::remove);
+
+ this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
+ // Uses spark as an engine so we can detect if we unintentionally try to use any execution engines
+ HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark");
+ }
+
+ @After
+ public void after() throws Exception {
+ HiveIcebergStorageHandlerTestUtils.close(shell);
+ }
+
+ @Test
+ public void testDateQuery() throws IOException {
+ Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get()));
+
+ List<Record> records = TestHelper.RecordsBuilder.newInstance(dateSchema)
+ .add(LocalDate.of(2020, 1, 21))
+ .add(LocalDate.of(2020, 1, 24))
+ .build();
+
+ testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records);
+
+ List<Object[]> result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+ result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+ result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2020-01-24", result.get(0)[0]);
+
+ result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'");
+ Assert.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void testTimestampQuery() throws IOException {
+ Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone()));
+
+ List<Record> records = TestHelper.RecordsBuilder.newInstance(timestampSchema)
+ .add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000))
+ .add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000))
+ .build();
+
+ testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records);
+
+ List<Object[]> result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+ result = shell.executeStatement(
+ "SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+ result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'");
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]);
+
+ result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'");
+ Assert.assertEquals(0, result.size());
+ }
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index ec79c12..9decb0c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -150,11 +150,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
@After
public void after() throws Exception {
- shell.closeSession();
- shell.metastore().reset();
- // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the
- // HiveServer2 is stopped. Only Finalizer closes the HMS connections.
- System.gc();
+ HiveIcebergStorageHandlerTestUtils.close(shell);
}
@Test