You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:29 UTC

[iceberg] 14/18: Hive: Fix predicate pushdown for Date (#2254)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 1f02595fb408b8ac12c83a758fa6fe9705f5ce8b
Author: pvary <pv...@cloudera.com>
AuthorDate: Fri Feb 26 12:51:10 2021 +0100

    Hive: Fix predicate pushdown for Date (#2254)
---
 .../iceberg/mr/hive/HiveIcebergFilterFactory.java  |  16 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |   6 +-
 .../mr/hive/TestHiveIcebergFilterFactory.java      |   2 +-
 .../TestHiveIcebergStorageHandlerTimezone.java     | 172 +++++++++++++++++++++
 .../TestHiveIcebergStorageHandlerWithEngine.java   |   6 +-
 5 files changed, 191 insertions(+), 11 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 33791c7..a645874 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.mr.hive;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.time.Instant;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
@@ -172,15 +171,24 @@ public class HiveIcebergFilterFactory {
     return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
   }
 
+  // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
+  // Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef()
+  // To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses
+  // the year/month/day to generate the object
   private static int daysFromDate(Date date) {
-    return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime()));
+    return DateTimeUtil.daysFromDate(date.toLocalDate());
   }
 
+  // Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp
+  // Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef()
+  // To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery
+  // as it uses the year/month/day/hour/min/sec/nanos to generate the object
   private static int daysFromTimestamp(Timestamp timestamp) {
-    return DateTimeUtil.daysFromInstant(timestamp.toInstant());
+    return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate());
   }
 
+  // We have to use the LocalDateTime to get the micros. See the comment above.
   private static long microsFromTimestamp(Timestamp timestamp) {
-    return DateTimeUtil.microsFromInstant(timestamp.toInstant());
+    return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime());
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 033e507..a67c513 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.data.GenericDeleteFilter;
 import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -284,8 +285,11 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
 
       if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
+        // Date and timestamp values are not the correct type for Evaluator.
+        // Wrapping to return the expected type.
+        InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
         Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
-        return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
+        return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
       } else {
         return iter;
       }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
index 3d436dc..3044f04 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
@@ -218,7 +218,7 @@ public class TestHiveIcebergFilterFactory {
   @Test
   public void testDateType() {
     SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
-    Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
+    Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12));
     SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build();
 
     UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value());
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
new file mode 100644
index 0000000..ea6b4d1
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.runners.Parameterized.Parameter;
+import static org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerTimezone {
+  private static final Optional<ThreadLocal<DateFormat>> dateFormat =
+      Optional.ofNullable((ThreadLocal<DateFormat>) DynFields.builder()
+          .hiddenImpl(TimestampWritable.class, "threadLocalDateFormat")
+          .defaultAlwaysNull()
+          .buildStatic()
+          .get());
+
+  private static final Optional<ThreadLocal<TimeZone>> localTimeZone =
+      Optional.ofNullable((ThreadLocal<TimeZone>) DynFields.builder()
+          .hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE")
+          .defaultAlwaysNull()
+          .buildStatic()
+          .get());
+
+  @Parameters(name = "timezone={0}")
+  public static Collection<Object[]> parameters() {
+    return ImmutableList.of(
+        new String[] {"America/New_York"},
+        new String[] {"Asia/Kolkata"},
+        new String[] {"UTC/Greenwich"}
+    );
+  }
+
+  private static TestHiveShell shell;
+
+  private TestTables testTables;
+
+  @Parameter(0)
+  public String timezoneString;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @BeforeClass
+  public static void beforeClass() {
+    shell = HiveIcebergStorageHandlerTestUtils.shell();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    shell.stop();
+  }
+
+  @Before
+  public void before() throws IOException {
+    TimeZone.setDefault(TimeZone.getTimeZone(timezoneString));
+
+    // Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the
+    // cached object
+    dateFormat.ifPresent(ThreadLocal::remove);
+    localTimeZone.ifPresent(ThreadLocal::remove);
+
+    this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
+    // Uses spark as an engine so we can detect if we unintentionally try to use any execution engines
+    HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark");
+  }
+
+  @After
+  public void after() throws Exception {
+    HiveIcebergStorageHandlerTestUtils.close(shell);
+  }
+
+  @Test
+  public void testDateQuery() throws IOException {
+    Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get()));
+
+    List<Record> records = TestHelper.RecordsBuilder.newInstance(dateSchema)
+        .add(LocalDate.of(2020, 1, 21))
+        .add(LocalDate.of(2020, 1, 24))
+        .build();
+
+    testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records);
+
+    List<Object[]> result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-24", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'");
+    Assert.assertEquals(0, result.size());
+  }
+
+  @Test
+  public void testTimestampQuery() throws IOException {
+    Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone()));
+
+    List<Record> records = TestHelper.RecordsBuilder.newInstance(timestampSchema)
+        .add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000))
+        .add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000))
+        .build();
+
+    testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records);
+
+    List<Object[]> result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+    result = shell.executeStatement(
+        "SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'");
+    Assert.assertEquals(0, result.size());
+  }
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index ec79c12..9decb0c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -150,11 +150,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
 
   @After
   public void after() throws Exception {
-    shell.closeSession();
-    shell.metastore().reset();
-    // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the
-    // HiveServer2 is stopped. Only Finalizer closes the HMS connections.
-    System.gc();
+    HiveIcebergStorageHandlerTestUtils.close(shell);
   }
 
   @Test