You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:15 UTC

[iceberg] branch 0.11.x updated (ad78cc6 -> aba3881)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a change to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git.


    from ad78cc6  Add version.txt for release 0.11.0
     new e12d468  Hive: Fix writing of Date, Decimal, Time and UUID types. (#2126)
     new 9b49c21  Spark: Refresh relation cache in DELETE and MERGE (#2154)
     new 1554cd6  Hive: Fix file extensions for written files (#2155)
     new d71b249  Hive: Fix identity partitioned writes (#2151)
     new c90d26c  Core: Improve error messages in CatalogUtil.loadCatalog (#2146)
     new 953f92b  Hive: Avoid closing null writer in output committer abortTask (#2150)
     new 4b2f390  Core: Fix data loss in compact action (#2196)
     new 756e9e6  Spark: Fix _pos metadata column in SparkAvroReader (#2215)
     new cb0e587  Hive: Avoid drop table related exceptions in MetaHook (#2191)
     new 49b9442  Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)
     new 2feeab5  Parquet: Fix row group filters with promoted types (#2232)
     new 42018f9  ORC: Fix vectorized reads with metadata columns (#2241)
     new 08ba760  Hive: Ensure unlock is called in HiveTableOperations to fix zombie locking (#2263)
     new 1f02595  Hive: Fix predicate pushdown for Date (#2254)
     new 8c13f2b  Hive: Quick fix for broken TestHiveIcebergFilterFactory.testTimestampType test (#2283)
     new f1a972b  AWS: Do not list non-iceberg table in GlueCatalog (#2267)
     new 05150df  Hive: Avoid reset hive configuration to default value. (#2075)
     new aba3881  Spark: Remove softValues for Spark 2 catalog cache (#2363)

The 18 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iceberg/aws/glue/GlueCatalogNamespaceTest.java |  24 ++-
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   |  27 +++-
 .../apache/iceberg/aws/glue/GlueCatalogTest.java   |  72 +++++++--
 .../main/java/org/apache/iceberg/CatalogUtil.java  |   2 +-
 .../actions/BaseRewriteDataFilesAction.java        |  11 +-
 .../iceberg/TestCatalogErrorConstructor.java}      |  47 +++---
 .../java/org/apache/iceberg/TestCatalogUtil.java   |  28 +++-
 .../iceberg/data/TestMetricsRowGroupFilter.java    |   9 ++
 .../java/org/apache/iceberg/hive/ClientPool.java   |  12 +-
 .../org/apache/iceberg/hive/HiveClientPool.java    |  13 ++
 .../apache/iceberg/hive/HiveTableOperations.java   |  14 +-
 .../org/apache/iceberg/hive/TestClientPool.java    | 110 ++++++++++---
 .../apache/iceberg/hive/TestHiveClientPool.java    |  70 +++++++++
 .../IcebergDateObjectInspectorHive3.java           |  11 +-
 .../iceberg/mr/hive/HiveIcebergFilterFactory.java  |  16 +-
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  20 ++-
 .../mr/hive/HiveIcebergOutputCommitter.java        |  11 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |   4 +-
 .../iceberg/mr/hive/HiveIcebergRecordWriter.java   |   4 +
 .../IcebergDecimalObjectInspector.java             |   9 +-
 .../IcebergTimeObjectInspector.java                |   5 +-
 .../IcebergUUIDObjectInspector.java                |   5 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |   6 +-
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      |  10 +-
 .../mr/hive/TestHiveIcebergFilterFactory.java      |   5 +-
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  43 +++++-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  |  42 +++++
 .../TestHiveIcebergStorageHandlerTimezone.java     | 172 +++++++++++++++++++++
 .../TestHiveIcebergStorageHandlerWithEngine.java   |  84 ++++++----
 .../org/apache/iceberg/mr/hive/TestTables.java     |  87 ++++++++++-
 .../TestIcebergTimeObjectInspector.java            |   5 +-
 .../TestIcebergUUIDObjectInspector.java            |  11 +-
 .../apache/iceberg/parquet/ParquetConversions.java |  15 ++
 .../parquet/ParquetDictionaryRowGroupFilter.java   |   6 +-
 .../parquet/ParquetMetricsRowGroupFilter.java      |  20 ++-
 .../parquet/TestDictionaryRowGroupFilter.java      |   8 +
 .../apache/iceberg/spark/data/SparkAvroReader.java |  11 +-
 .../iceberg/spark/source/BatchDataReader.java      |  10 +-
 .../actions/TestRewriteDataFilesAction.java        |  54 +++++++
 .../iceberg/spark/source/CustomCatalogs.java       |   3 +-
 .../v2/ExtendedDataSourceV2Strategy.scala          |   9 +-
 .../execution/datasources/v2/ReplaceDataExec.scala |   9 +-
 .../SparkRowLevelOperationsTestBase.java           |  24 ++-
 .../iceberg/spark/extensions/TestDelete.java       |  35 +++++
 .../apache/iceberg/spark/extensions/TestMerge.java |  29 ++++
 45 files changed, 1068 insertions(+), 154 deletions(-)
 copy core/src/{main/java/org/apache/iceberg/IndexedStructLike.java => test/java/org/apache/iceberg/TestCatalogErrorConstructor.java} (50%)
 create mode 100644 hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
 create mode 100644 mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java

[iceberg] 04/18: Hive: Fix identity partitioned writes (#2151)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit d71b2493a9007b1e7a16ad6e63d5765f71f486fb
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Jan 27 15:56:55 2021 +0100

    Hive: Fix identity partitioned writes (#2151)
---
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  2 +-
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 82 +++++++++-------------
 .../org/apache/iceberg/mr/hive/TestTables.java     | 69 +++++++++++++++++-
 3 files changed, 102 insertions(+), 51 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 6ebc677..823490e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -77,7 +77,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
         new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
             taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
     HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
-        new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
+        new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID);
 
     return writer;
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 7200e3d..ec79c12 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -20,9 +20,6 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -30,16 +27,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hive.HiveSchemaUtil;
-import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.types.Type;
@@ -327,19 +319,14 @@ public class TestHiveIcebergStorageHandlerWithEngine {
       if (type.equals(Types.BinaryType.get()) || type.equals(Types.FixedType.ofLength(5))) {
         continue;
       }
-      String tableName = type.typeId().toString().toLowerCase() + "_table_" + i;
       String columnName = type.typeId().toString().toLowerCase() + "_column";
 
       Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, columnName, type));
       List<Record> expected = TestHelper.generateRandomRecords(schema, 5, 0L);
 
-      Table table = testTables.createTable(shell, tableName, schema, fileFormat, ImmutableList.of());
-      StringBuilder query = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES")
-              .append(expected.stream()
-                      .map(r -> String.format("(%s,%s)", r.get(0),
-                              getStringValueForInsert(r.get(1), type)))
-                      .collect(Collectors.joining(",")));
-      shell.executeStatement(query.toString());
+      Table table = testTables.createTable(shell, type.typeId().toString().toLowerCase() + "_table_" + i,
+          schema, PartitionSpec.unpartitioned(), fileFormat, expected);
+
       HiveIcebergTestUtils.validateData(table, expected, 0);
     }
   }
@@ -527,29 +514,44 @@ public class TestHiveIcebergStorageHandlerWithEngine {
         .bucket("customer_id", 3)
         .build();
 
-    TableIdentifier identifier = TableIdentifier.of("default", "partitioned_customers");
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
+
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testIdentityPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
 
-    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
-        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
-        testTables.locationForCreateTableSQL(identifier) +
-        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
-        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
-        PartitionSpecParser.toJson(spec) + "', " +
-        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("customer_id")
+        .build();
 
     List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
 
-    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
-    records.forEach(record -> query.append("(")
-        .append(record.get(0)).append(",'")
-        .append(record.get(1)).append("','")
-        .append(record.get(2)).append("'),"));
-    query.setLength(query.length() - 1);
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
 
-    shell.executeStatement(query.toString());
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testMultilevelIdentityPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .identity("customer_id")
+        .identity("last_name")
+        .build();
+
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    Table table = testTables.createTable(shell, "partitioned_customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, fileFormat, records);
 
-    Table table = testTables.loadTable(identifier);
     HiveIcebergTestUtils.validateData(table, records, 0);
   }
 
@@ -613,18 +615,4 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     }
     return query;
   }
-
-  private String getStringValueForInsert(Object value, Type type) {
-    String template = "\'%s\'";
-    if (type.equals(Types.TimestampType.withoutZone())) {
-      return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
-    } else if (type.equals(Types.TimestampType.withZone())) {
-      return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
-    } else if (type.equals(Types.BooleanType.get())) {
-      // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
-      return value.toString();
-    } else {
-      return String.format(template, value.toString());
-    }
-  }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 4bc27a7..375fff3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -22,16 +22,23 @@ package org.apache.iceberg.mr.hive;
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.Tables;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +54,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
@@ -117,9 +126,9 @@ abstract class TestTables {
   }
 
   /**
-   * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
-   * needed. The table will be in the 'default' database. The table will be populated with the provided List of
-   * {@link Record}s.
+   * Creates an non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive
+   * table as well when needed. The table will be in the 'default' database. The table will be populated with the
+   * provided List of {@link Record}s.
    * @param shell The HiveShell used for Hive table creation
    * @param tableName The name of the test table
    * @param schema The schema used for the table creation
@@ -140,6 +149,46 @@ abstract class TestTables {
   }
 
   /**
+   * Creates a partitioned Hive test table using Hive SQL. The table will be in the 'default' database.
+   * The table will be populated with the provided List of {@link Record}s using a Hive insert statement.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param spec The partition specification for the table
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema schema, PartitionSpec spec,
+      FileFormat fileFormat, List<Record> records)  {
+    TableIdentifier identifier = TableIdentifier.of("default", tableName);
+    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
+        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
+        locationForCreateTableSQL(identifier) +
+        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
+        SchemaParser.toJson(schema) + "', " +
+        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
+        PartitionSpecParser.toJson(spec) + "', " +
+        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+
+    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+
+    records.forEach(record -> {
+      query.append("(");
+      query.append(record.struct().fields().stream()
+          .map(field -> getStringValueForInsert(record.getField(field.name()), field.type()))
+          .collect(Collectors.joining(",")));
+      query.append("),");
+    });
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    return loadTable(identifier);
+  }
+
+  /**
    * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
    * needed. The table will be in the 'default' database. The table will be populated with the provided with randomly
    * generated {@link Record}s.
@@ -346,6 +395,20 @@ abstract class TestTables {
     return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name();
   }
 
+  private String getStringValueForInsert(Object value, Type type) {
+    String template = "\'%s\'";
+    if (type.equals(Types.TimestampType.withoutZone())) {
+      return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
+    } else if (type.equals(Types.TimestampType.withZone())) {
+      return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
+    } else if (type.equals(Types.BooleanType.get())) {
+      // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
+      return value.toString();
+    } else {
+      return String.format(template, value.toString());
+    }
+  }
+
   enum TestTableType {
     HADOOP_TABLE {
       public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {

[iceberg] 10/18: Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 49b944257ec59e6886119e9b44184ec0e1818d4a
Author: Carm <15...@qq.com>
AuthorDate: Tue Feb 9 16:16:59 2021 +0800

    Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)
---
 .../java/org/apache/iceberg/hive/ClientPool.java   |   6 +-
 .../org/apache/iceberg/hive/HiveClientPool.java    |   6 ++
 .../org/apache/iceberg/hive/TestClientPool.java    | 110 +++++++++++++++++----
 3 files changed, 101 insertions(+), 21 deletions(-)

diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
index 85ce587..7fe9b66 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -54,7 +54,7 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
       return action.run(client);
 
     } catch (Exception exc) {
-      if (reconnectExc.isInstance(exc)) {
+      if (isConnectionException(exc)) {
         try {
           client = reconnect(client);
         } catch (Exception ignored) {
@@ -76,6 +76,10 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
 
   protected abstract C reconnect(C client);
 
+  protected boolean isConnectionException(Exception exc) {
+    return reconnectExc.isInstance(exc);
+  }
+
   protected abstract void close(C client);
 
   @Override
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 1df705b..e6d70cb 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -84,6 +84,12 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException>
   }
 
   @Override
+  protected boolean isConnectionException(Exception e) {
+    return super.isConnectionException(e) || (e != null && e instanceof MetaException &&
+        e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"));
+  }
+
+  @Override
   protected void close(HiveMetaStoreClient client) {
     client.close();
   }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
index cdf59d8..f88f385 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
@@ -19,36 +19,106 @@
 
 package org.apache.iceberg.hive;
 
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestClientPool {
 
-  @Test(expected = RuntimeException.class)
-  public void testNewClientFailure() throws Exception {
-    try (MockClientPool pool = new MockClientPool(2, Exception.class)) {
-      pool.run(Object::toString);
-    }
+  HiveClientPool clients;
+
+  @Before
+  public void before() {
+    HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+    clients = Mockito.spy(clientPool);
   }
 
-  private static class MockClientPool extends ClientPool<Object, Exception> {
+  @After
+  public void after() {
+    clients.close();
+    clients = null;
+  }
 
-    MockClientPool(int poolSize, Class<? extends Exception> reconnectExc) {
-      super(poolSize, reconnectExc);
-    }
+  @Test
+  public void testNewClientFailure() {
+    Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient();
+    AssertHelpers.assertThrows("Should throw exception", RuntimeException.class,
+        "Connection exception", () -> clients.run(Object::toString));
+  }
+
+  @Test
+  public void testGetTablesFailsForNonReconnectableException() throws Exception {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    Mockito.doThrow(new MetaException("Another meta exception"))
+      .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString());
+    AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+        "Another meta exception", () -> clients.run(client -> client.getTables("default", "t")));
+  }
 
-    @Override
-    protected Object newClient() {
-      throw new RuntimeException();
-    }
+  @Test
+  public void testConnectionFailureRestoreForMetaException() throws Exception {
+    HiveMetaStoreClient hmsClient = newClient();
 
-    @Override
-    protected Object reconnect(Object client) {
-      return null;
-    }
+    // Throwing an exception may trigger the client to reconnect.
+    String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException";
+    Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases();
 
-    @Override
-    protected void close(Object client) {
+    // Create a new client when the reconnect method is called.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    List<String> databases = Lists.newArrayList("db1", "db2");
+
+    Mockito.doReturn(databases).when(newClient).getAllDatabases();
+    // The return is OK when the reconnect method is called.
+    Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases()));
+
+    // Verify that the method is called.
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  @Test
+  public void testConnectionFailureRestoreForTTransportException() throws Exception {
+    HiveMetaStoreClient hmsClient = newClient();
+    Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions();
+
+    // Create a new client when getAllFunctions() failed.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+    response.addToFunctions(
+        new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null));
+    Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+    Assert.assertEquals(response, clients.run(client -> client.getAllFunctions()));
+
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  private HiveMetaStoreClient newClient() {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    return hmsClient;
+  }
 
-    }
+  private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+    HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+    return newClient;
   }
 }

[iceberg] 05/18: Core: Improve error messages in CatalogUtil.loadCatalog (#2146)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit c90d26cac41220c1a6a6140ff3814c671e076d04
Author: Ryan Murray <ry...@dremio.com>
AuthorDate: Thu Jan 28 20:54:19 2021 +0100

    Core: Improve error messages in CatalogUtil.loadCatalog (#2146)
---
 .../main/java/org/apache/iceberg/CatalogUtil.java  |  2 +-
 .../iceberg/TestCatalogErrorConstructor.java       | 70 ++++++++++++++++++++++
 .../java/org/apache/iceberg/TestCatalogUtil.java   | 28 ++++++++-
 3 files changed, 98 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index f22996d..776cc2f 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -155,7 +155,7 @@ public class CatalogUtil {
       ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();
     } catch (NoSuchMethodException e) {
       throw new IllegalArgumentException(String.format(
-          "Cannot initialize Catalog, missing no-arg constructor: %s", impl), e);
+          "Cannot initialize Catalog implementation %s: %s", impl, e.getMessage()), e);
     }
 
     Catalog catalog;
diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogErrorConstructor.java b/core/src/test/java/org/apache/iceberg/TestCatalogErrorConstructor.java
new file mode 100644
index 0000000..ade2dd1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestCatalogErrorConstructor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+
+/**
+ * Throws an error on initialization to simulate class not found error.
+ */
+public class TestCatalogErrorConstructor extends BaseMetastoreCatalog {
+  static {
+    if (true) {
+      throw new NoClassDefFoundError("Error while initializing class");
+    }
+  }
+
+  public TestCatalogErrorConstructor() {
+
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return null;
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    return null;
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    return null;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    return false;
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
index 84ab3d4..37d66c5 100644
--- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
@@ -70,7 +70,7 @@ public class TestCatalogUtil {
     String name = "custom";
     AssertHelpers.assertThrows("must have no-arg constructor",
         IllegalArgumentException.class,
-        "missing no-arg constructor",
+        "NoSuchMethodException: org.apache.iceberg.TestCatalogUtil$TestCatalogBadConstructor.<init>()",
         () -> CatalogUtil.loadCatalog(TestCatalogBadConstructor.class.getName(), name, options, hadoopConf));
   }
 
@@ -87,6 +87,32 @@ public class TestCatalogUtil {
         () -> CatalogUtil.loadCatalog(TestCatalogNoInterface.class.getName(), name, options, hadoopConf));
   }
 
+  @Test
+  public void loadCustomCatalog_ConstructorErrorCatalog() {
+    Map<String, String> options = new HashMap<>();
+    options.put("key", "val");
+    Configuration hadoopConf = new Configuration();
+    String name = "custom";
+
+    String impl = TestCatalogErrorConstructor.class.getName();
+    AssertHelpers.assertThrows("must be able to initialize catalog",
+        IllegalArgumentException.class,
+        "NoClassDefFoundError: Error while initializing class",
+        () -> CatalogUtil.loadCatalog(impl, name, options, hadoopConf));
+  }
+
+  @Test
+  public void loadCustomCatalog_BadCatalogNameCatalog() {
+    Map<String, String> options = new HashMap<>();
+    options.put("key", "val");
+    Configuration hadoopConf = new Configuration();
+    String name = "custom";
+    String impl = "CatalogDoesNotExist";
+    AssertHelpers.assertThrows("catalog must exist",
+        IllegalArgumentException.class,
+        "java.lang.ClassNotFoundException: CatalogDoesNotExist",
+        () -> CatalogUtil.loadCatalog(impl, name, options, hadoopConf));
+  }
 
   @Test
   public void loadCustomFileIO_noArg() {

[iceberg] 13/18: Hive: Ensure unlock is called in HiveTableOperations to fix zombie locking (#2263)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 08ba76052d9241cb856fac5d87ae7d5081c9a7cb
Author: ZorTsou <sc...@outlook.com>
AuthorDate: Thu Feb 25 02:12:09 2021 +0800

    Hive: Ensure unlock is called in HiveTableOperations to fix zombie locking (#2263)
---
 .../java/org/apache/iceberg/hive/HiveTableOperations.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 05ca375..e06a5cf 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -315,8 +315,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     long duration = 0;
     boolean timeout = false;
 
-    if (state.get().equals(LockState.WAITING)) {
-      try {
+    try {
+      if (state.get().equals(LockState.WAITING)) {
         // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact, the maximum number of
         // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the
         // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set
@@ -344,9 +344,13 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
                 LOG.warn("Interrupted while waiting for lock.", e);
               }
             }, TException.class);
-      } catch (WaitingForLockException waitingForLockException) {
-        timeout = true;
-        duration = System.currentTimeMillis() - start;
+      }
+    } catch (WaitingForLockException waitingForLockException) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } finally {
+      if (!state.get().equals(LockState.ACQUIRED)) {
+        unlock(Optional.of(lockId));
       }
     }
 

[iceberg] 12/18: ORC: Fix vectorized reads with metadata columns (#2241)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 42018f97ff1949b3f11c512c15a36b07a84f9551
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Feb 16 10:39:25 2021 -0800

    ORC: Fix vectorized reads with metadata columns (#2241)
---
 .../java/org/apache/iceberg/spark/source/BatchDataReader.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 6f49509..d48cf24 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -20,11 +20,13 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.arrow.vector.NullCheckingForGet;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableIterable;
@@ -35,6 +37,7 @@ import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
@@ -90,9 +93,12 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
 
       iter = builder.build();
     } else if (task.file().format() == FileFormat.ORC) {
-      Schema schemaWithoutConstants = TypeUtil.selectNot(expectedSchema, idToConstant.keySet());
+      Set<Integer> constantFieldIds = idToConstant.keySet();
+      Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
+      Sets.SetView<Integer> constantAndMetadataFieldIds = Sets.union(constantFieldIds, metadataFieldIds);
+      Schema schemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(expectedSchema, constantAndMetadataFieldIds);
       ORC.ReadBuilder builder = ORC.read(location)
-          .project(schemaWithoutConstants)
+          .project(schemaWithoutConstantAndMetadataFields)
           .split(task.start(), task.length())
           .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, fileSchema,
               idToConstant))

[iceberg] 06/18: Hive: Avoid closing null writer in output committer abortTask (#2150)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 953f92bb91a8a6af7a582ef171367d6e6402a69e
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Wed Feb 3 01:12:34 2021 +0100

    Hive: Avoid closing null writer in output committer abortTask (#2150)
    
    Co-authored-by: Marton Bod <mb...@cloudera.com>
---
 .../mr/hive/HiveIcebergOutputCommitter.java        | 11 ++++--
 .../iceberg/mr/hive/HiveIcebergRecordWriter.java   |  4 ++
 .../mr/hive/TestHiveIcebergOutputCommitter.java    | 43 ++++++++++++++++++++--
 3 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 7c34f5a..eed0fdc 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
  * An Iceberg table committer for adding data files to the Iceberg tables.
  * Currently independent of the Hive ACID transactions.
  */
-public final class HiveIcebergOutputCommitter extends OutputCommitter {
+public class HiveIcebergOutputCommitter extends OutputCommitter {
   private static final String FOR_COMMIT_EXTENSION = ".forCommit";
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
@@ -90,7 +90,7 @@ public final class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptID attemptID = context.getTaskAttemptID();
     String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(),
         attemptID.getJobID(), attemptID.getTaskID().getId());
-    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.getWriter(attemptID);
 
     DataFile[] closedFiles;
     if (writer != null) {
@@ -101,6 +101,9 @@ public final class HiveIcebergOutputCommitter extends OutputCommitter {
 
     // Creating the file containing the data files generated by this task
     createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io(context.getJobConf()));
+
+    // remove the writer to release the object
+    HiveIcebergRecordWriter.removeWriter(attemptID);
   }
 
   /**
@@ -114,7 +117,9 @@ public final class HiveIcebergOutputCommitter extends OutputCommitter {
     HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    writer.close(true);
+    if (writer != null) {
+      writer.close(true);
+    }
   }
 
   /**
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 7d94e21..8622d1a 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -58,6 +58,10 @@ class HiveIcebergRecordWriter extends PartitionedFanoutWriter<Record>
     return writers.remove(taskAttemptID);
   }
 
+  static HiveIcebergRecordWriter getWriter(TaskAttemptID taskAttemptID) {
+    return writers.get(taskAttemptID);
+  }
+
   HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
       FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
       TaskAttemptID taskAttemptID) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 96b3b20..73f9786 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -54,7 +55,10 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
+import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriter;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestHiveIcebergOutputCommitter {
@@ -182,6 +186,33 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
   }
 
+  @Test
+  public void writerIsClosedAfterTaskCommitFailure() throws IOException {
+    HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
+    HiveIcebergOutputCommitter failingCommitter = Mockito.spy(committer);
+    ArgumentCaptor<TaskAttemptContextImpl> argumentCaptor = ArgumentCaptor.forClass(TaskAttemptContextImpl.class);
+    String exceptionMessage = "Commit task failed!";
+    Mockito.doThrow(new RuntimeException(exceptionMessage))
+            .when(failingCommitter).commitTask(argumentCaptor.capture());
+
+    Table table = table(temp.getRoot().getPath(), false);
+    JobConf conf = jobConf(table, 1);
+    try {
+      writeRecords(1, 0, true, false, conf, failingCommitter);
+      Assert.fail();
+    } catch (RuntimeException e) {
+      Assert.assertTrue(e.getMessage().contains(exceptionMessage));
+    }
+
+    Assert.assertEquals(1, argumentCaptor.getAllValues().size());
+    TaskAttemptID capturedId = argumentCaptor.getValue().getTaskAttemptID();
+    // writer is still in the map after commitTask failure
+    Assert.assertNotNull(getWriter(capturedId));
+    failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
+    // abortTask succeeds and removes writer
+    Assert.assertNull(getWriter(capturedId));
+  }
+
   private Table table(String location, boolean partitioned) {
     HadoopTables tables = new HadoopTables();
     return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location);
@@ -212,11 +243,12 @@ public class TestHiveIcebergOutputCommitter {
    * @param abortTasks If <code>true</code> the tasks will be aborted - needed so we can simulate no commit/no abort
    *                   situation
    * @param conf The job configuration
+   * @param committer The output committer that should be used for committing/aborting the tasks
    * @return The random generated records which were appended to the table
    * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
    */
   private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
-                                    JobConf conf) throws IOException {
+                                    JobConf conf, OutputCommitter committer) throws IOException {
     List<Record> expected = new ArrayList<>(RECORD_NUM * taskNum);
 
     FileIO io = HiveIcebergStorageHandler.io(conf);
@@ -243,13 +275,18 @@ public class TestHiveIcebergOutputCommitter {
 
       testWriter.close(false);
       if (commitTasks) {
-        new HiveIcebergOutputCommitter().commitTask(new TaskAttemptContextImpl(conf, taskId));
+        committer.commitTask(new TaskAttemptContextImpl(conf, taskId));
         expected.addAll(records);
       } else if (abortTasks) {
-        new HiveIcebergOutputCommitter().abortTask(new TaskAttemptContextImpl(conf, taskId));
+        committer.abortTask(new TaskAttemptContextImpl(conf, taskId));
       }
     }
 
     return expected;
   }
+
+  private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
+                                    JobConf conf) throws IOException {
+    return writeRecords(taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
+  }
 }

[iceberg] 16/18: AWS: Do not list non-iceberg table in GlueCatalog (#2267)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit f1a972b01293a78577f272183b4867486c5aa6a0
Author: Jack Ye <yz...@amazon.com>
AuthorDate: Wed Mar 3 16:23:29 2021 -0800

    AWS: Do not list non-iceberg table in GlueCatalog (#2267)
---
 .../iceberg/aws/glue/GlueCatalogNamespaceTest.java | 24 +++++++-
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 27 ++++++--
 .../apache/iceberg/aws/glue/GlueCatalogTest.java   | 72 +++++++++++++++++++---
 3 files changed, 106 insertions(+), 17 deletions(-)

diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
index 9dc3ce5..d7f1058 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws.glue;
 
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -31,9 +32,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
 import software.amazon.awssdk.services.glue.model.Database;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
 import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.TableInput;
 
 public class GlueCatalogNamespaceTest extends GlueTestBase {
 
@@ -131,12 +134,27 @@ public class GlueCatalogNamespaceTest extends GlueTestBase {
   }
 
   @Test
-  public void testDropNamespaceNonEmpty() {
+  public void testDropNamespaceNonEmpty_containsIcebergTable() {
     String namespace = createNamespace();
     createTable(namespace);
-    AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+    AssertHelpers.assertThrows("namespace should not be dropped when still has Iceberg table",
         NamespaceNotEmptyException.class,
-        "it is not empty",
+        "still contains Iceberg tables",
+        () -> glueCatalog.dropNamespace(Namespace.of(namespace)));
+  }
+
+  @Test
+  public void testDropNamespaceNonEmpty_containsNonIcebergTable() {
+    String namespace = createNamespace();
+    glue.createTable(CreateTableRequest.builder()
+        .databaseName(namespace)
+        .tableInput(TableInput.builder()
+            .name(UUID.randomUUID().toString())
+            .build())
+        .build());
+    AssertHelpers.assertThrows("namespace should not be dropped when still has non-Iceberg table",
+        NamespaceNotEmptyException.class,
+        "still contains non-Iceberg tables",
         () -> glueCatalog.dropNamespace(Namespace.of(namespace)));
   }
 
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index a4d407d..197a730 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.TableMetadata;
@@ -176,6 +177,7 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp
       nextToken = response.nextToken();
       if (response.hasTableList()) {
         results.addAll(response.tableList().stream()
+            .filter(this::isGlueIcebergTable)
             .map(GlueToIcebergConverter::toTableId)
             .collect(Collectors.toList()));
       }
@@ -185,6 +187,12 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp
     return results;
   }
 
+  private boolean isGlueIcebergTable(Table table) {
+    return table.parameters() != null &&
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(
+            table.parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP));
+  }
+
   @Override
   public boolean dropTable(TableIdentifier identifier, boolean purge) {
     try {
@@ -336,10 +344,21 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp
   @Override
   public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
     namespaceExists(namespace);
-    List<TableIdentifier> tableIdentifiers = listTables(namespace);
-    if (tableIdentifiers != null && !tableIdentifiers.isEmpty()) {
-      throw new NamespaceNotEmptyException("Cannot drop namespace %s because it is not empty. " +
-          "The following tables still exist under the namespace: %s", namespace, tableIdentifiers);
+
+    GetTablesResponse response = glue.getTables(GetTablesRequest.builder()
+        .catalogId(awsProperties.glueCatalogId())
+        .databaseName(IcebergToGlueConverter.toDatabaseName(namespace))
+        .build());
+
+    if (response.hasTableList() && !response.tableList().isEmpty()) {
+      Table table = response.tableList().get(0);
+      if (isGlueIcebergTable(table)) {
+        throw new NamespaceNotEmptyException(
+            "Cannot drop namespace %s because it still contains Iceberg tables", namespace);
+      } else {
+        throw new NamespaceNotEmptyException(
+            "Cannot drop namespace %s because it still contains non-Iceberg tables", namespace);
+      }
     }
 
     glue.deleteDatabase(DeleteDatabaseRequest.builder()
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
index 95dfad6..669a73e 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -123,8 +124,29 @@ public class GlueCatalogTest {
         .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
     Mockito.doReturn(GetTablesResponse.builder()
         .tableList(
-            Table.builder().databaseName("db1").name("t1").build(),
-            Table.builder().databaseName("db1").name("t2").build()
+            Table.builder().databaseName("db1").name("t1").parameters(
+                ImmutableMap.of(
+                    BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+                )
+            ).build(),
+            Table.builder().databaseName("db1").name("t2").parameters(
+                ImmutableMap.of(
+                    "key", "val",
+                    BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+                )
+            ).build(),
+            Table.builder().databaseName("db1").name("t3").parameters(
+                ImmutableMap.of(
+                    "key", "val",
+                    BaseMetastoreTableOperations.TABLE_TYPE_PROP, "wrongVal"
+                )
+            ).build(),
+            Table.builder().databaseName("db1").name("t4").parameters(
+                ImmutableMap.of(
+                    "key", "val"
+                )
+            ).build(),
+            Table.builder().databaseName("db1").name("t5").parameters(null).build()
         ).build())
         .when(glue).getTables(Mockito.any(GetTablesRequest.class));
     Assert.assertEquals(
@@ -148,14 +170,23 @@ public class GlueCatalogTest {
         if (counter.decrementAndGet() > 0) {
           return GetTablesResponse.builder()
               .tableList(
-                  Table.builder().databaseName("db1").name(
-                      UUID.randomUUID().toString().replace("-", "")).build()
+                  Table.builder()
+                      .databaseName("db1")
+                      .name(UUID.randomUUID().toString().replace("-", ""))
+                      .parameters(ImmutableMap.of(
+                          BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+                          BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+                      ))
+                      .build()
               )
               .nextToken("token")
               .build();
         } else {
           return GetTablesResponse.builder()
-              .tableList(Table.builder().databaseName("db1").name("tb1").build())
+              .tableList(Table.builder().databaseName("db1").name("tb1").parameters(ImmutableMap.of(
+                  BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+                  BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+              )).build())
               .build();
         }
       }
@@ -313,11 +344,14 @@ public class GlueCatalogTest {
   }
 
   @Test
-  public void dropNamespace_notEmpty() {
+  public void dropNamespace_notEmpty_containsIcebergTable() {
     Mockito.doReturn(GetTablesResponse.builder()
         .tableList(
-            Table.builder().databaseName("db1").name("t1").build(),
-            Table.builder().databaseName("db1").name("t2").build()
+            Table.builder().databaseName("db1").name("t1").parameters(
+                ImmutableMap.of(
+                    BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+                )
+            ).build()
         ).build())
         .when(glue).getTables(Mockito.any(GetTablesRequest.class));
     Mockito.doReturn(GetDatabaseResponse.builder()
@@ -325,9 +359,27 @@ public class GlueCatalogTest {
         .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
     Mockito.doReturn(DeleteDatabaseResponse.builder().build())
         .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
-    AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+    AssertHelpers.assertThrows("namespace should not be dropped when still has Iceberg table",
         NamespaceNotEmptyException.class,
-        "Cannot drop namespace",
+        "still contains Iceberg tables",
+        () -> glueCatalog.dropNamespace(Namespace.of("db1")));
+  }
+
+  @Test
+  public void dropNamespace_notEmpty_containsNonIcebergTable() {
+    Mockito.doReturn(GetTablesResponse.builder()
+        .tableList(
+            Table.builder().databaseName("db1").name("t1").build()
+        ).build())
+        .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(DeleteDatabaseResponse.builder().build())
+        .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
+    AssertHelpers.assertThrows("namespace should not be dropped when still has non-Iceberg table",
+        NamespaceNotEmptyException.class,
+        "still contains non-Iceberg tables",
         () -> glueCatalog.dropNamespace(Namespace.of("db1")));
   }
 

[iceberg] 17/18: Hive: Avoid reset hive configuration to default value. (#2075)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 05150df95e53dca47af5435f2f56fe0a4db04f4b
Author: dixingxing <di...@yeah.net>
AuthorDate: Thu Mar 18 15:03:16 2021 +0800

    Hive: Avoid reset hive configuration to default value. (#2075)
    
    
    
    * Add test for iceberg-[2070]
    
    * Fix test and codestyle
    
    * Add TestHiveClientPool
    
    * Delete unnecessary test case
    
    * Delete unnecessary test case
    
    * Delete misunderstanding test code.
    
    Co-authored-by: dixingxing <di...@autohome.com.cn>
---
 .../java/org/apache/iceberg/hive/ClientPool.java   |  6 ++
 .../org/apache/iceberg/hive/HiveClientPool.java    |  7 +++
 .../apache/iceberg/hive/TestHiveClientPool.java    | 70 ++++++++++++++++++++++
 3 files changed, 83 insertions(+)

diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
index 7fe9b66..cccd0da 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.hive;
 import java.io.Closeable;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,6 +111,11 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
     }
   }
 
+  @VisibleForTesting
+  int poolSize() {
+    return poolSize;
+  }
+
   private C get() throws InterruptedException {
     Preconditions.checkState(!closed, "Cannot get a client from a closed pool");
     while (true) {
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index e6d70cb..1d3ef04 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
@@ -45,6 +46,7 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException>
   public HiveClientPool(int poolSize, Configuration conf) {
     super(poolSize, TTransportException.class);
     this.hiveConf = new HiveConf(conf, HiveClientPool.class);
+    this.hiveConf.addResource(conf);
   }
 
   @Override
@@ -93,4 +95,9 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException>
   protected void close(HiveMetaStoreClient client) {
     client.close();
   }
+
+  @VisibleForTesting
+  HiveConf hiveConf() {
+    return hiveConf;
+  }
 }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
new file mode 100644
index 0000000..f5e2ea6
--- /dev/null
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHiveClientPool {
+
+  private static final String HIVE_SITE_CONTENT = "<?xml version=\"1.0\"?>\n" +
+          "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
+          "<configuration>\n" +
+          "  <property>\n" +
+          "    <name>hive.metastore.sasl.enabled</name>\n" +
+          "    <value>true</value>\n" +
+          "  </property>\n" +
+          "</configuration>\n";
+
+  @Test
+  public void testConf() {
+    HiveConf conf = createHiveConf();
+    conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:/mywarehouse/");
+    conf.setInt("iceberg.hive.client-pool-size", 10);
+
+    HiveClientPool clientPool = new HiveClientPool(conf);
+    HiveConf clientConf = clientPool.hiveConf();
+
+    Assert.assertEquals(conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+            clientConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
+    Assert.assertEquals(conf.get("iceberg.hive.client-pool-size"), clientConf.get("iceberg.hive.client-pool-size"));
+    Assert.assertEquals(10, clientPool.poolSize());
+
+    // 'hive.metastore.sasl.enabled' should be 'true' as defined in xml
+    Assert.assertEquals(conf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname),
+            clientConf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
+    Assert.assertTrue(clientConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+  }
+
+  private HiveConf createHiveConf() {
+    HiveConf hiveConf = new HiveConf();
+    try (InputStream inputStream = new ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) {
+      hiveConf.addResource(inputStream, "for_test");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return hiveConf;
+  }
+}

[iceberg] 15/18: Hive: Quick fix for broken TestHiveIcebergFilterFactory.testTimestampType test (#2283)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 8c13f2b81ff4a0d7bdb9eee6b47ca4c1da19aff3
Author: pvary <pv...@cloudera.com>
AuthorDate: Sun Feb 28 21:45:57 2021 +0100

    Hive: Quick fix for broken TestHiveIcebergFilterFactory.testTimestampType test (#2283)
---
 .../java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
index 3044f04..dc48501 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
@@ -23,7 +23,6 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
-import java.time.ZoneOffset;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
@@ -231,7 +230,7 @@ public class TestHiveIcebergFilterFactory {
   public void testTimestampType() {
     Literal<Long> timestampLiteral = Literal.of("2012-10-02T05:16:17.123456").to(Types.TimestampType.withoutZone());
     long timestampMicros = timestampLiteral.value();
-    Timestamp ts = Timestamp.from(DateTimeUtil.timestampFromMicros(timestampMicros).toInstant(ZoneOffset.UTC));
+    Timestamp ts = Timestamp.valueOf(DateTimeUtil.timestampFromMicros(timestampMicros));
 
     SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
     SearchArgument arg = builder.startAnd().equals("timestamp", PredicateLeaf.Type.TIMESTAMP, ts).end().build();

[iceberg] 03/18: Hive: Fix file extensions for written files (#2155)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 1554cd664e7e633a3ceed2968e3e280d8b1c4fcb
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Jan 27 09:08:15 2021 +0100

    Hive: Fix file extensions for written files (#2155)
---
 .../main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java  | 2 +-
 .../iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java       | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 95b6b69..6ebc677 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -74,7 +74,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
     LocationProvider location = HiveIcebergStorageHandler.location(jc);
     EncryptionManager encryption = HiveIcebergStorageHandler.encryption(jc);
     OutputFileFactory outputFileFactory =
-        new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskAttemptID.getTaskID().getId(),
+        new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
             taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
     HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
         new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 78f65d6..7200e3d 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -535,7 +536,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
         SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
         PartitionSpecParser.toJson(spec) + "', " +
-        "'" + InputFormatConfig.WRITE_FILE_FORMAT + "'='" + fileFormat + "')");
+        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
 
     List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
 

[iceberg] 09/18: Hive: Avoid drop table related exceptions in MetaHook (#2191)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit cb0e5878da5f9429c91faf50bc76a89a8ee62614
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Fri Feb 5 11:23:02 2021 +0100

    Hive: Avoid drop table related exceptions in MetaHook (#2191)
---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 20 ++++++++---
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 42 ++++++++++++++++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     | 42 +++++++++++++---------
 3 files changed, 83 insertions(+), 21 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 586f6fc..dc160c8 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -167,11 +167,21 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   @Override
   public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
     if (deleteData && deleteIcebergTable) {
-      if (!Catalogs.hiveCatalog(conf)) {
-        LOG.info("Dropping with purge all the data for table {}.{}", hmsTable.getDbName(), hmsTable.getTableName());
-        Catalogs.dropTable(conf, catalogProperties);
-      } else {
-        CatalogUtil.dropTableData(deleteIo, deleteMetadata);
+      try {
+        if (!Catalogs.hiveCatalog(conf)) {
+          LOG.info("Dropping with purge all the data for table {}.{}", hmsTable.getDbName(), hmsTable.getTableName());
+          Catalogs.dropTable(conf, catalogProperties);
+        } else {
+          // do nothing if metadata folder has been deleted already (Hive 4 behaviour for purge=TRUE)
+          if (deleteIo.newInputFile(deleteMetadata.location()).exists()) {
+            CatalogUtil.dropTableData(deleteIo, deleteMetadata);
+          }
+        }
+      } catch (Exception e) {
+        // we want to successfully complete the Hive DROP TABLE command despite catalog-related exceptions here
+        // e.g. we wish to successfully delete a Hive table even if the underlying Hadoop table has already been deleted
+        LOG.warn("Exception during commitDropTable operation for table {}.{}.",
+            hmsTable.getDbName(), hmsTable.getTableName(), e);
       }
     }
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 78655dc..f2d0d3c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.types.Type;
@@ -53,6 +55,7 @@ import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -562,4 +565,43 @@ public class TestHiveIcebergStorageHandlerNoScan {
           "from deserializer"}, rows.get(i));
     }
   }
+
+  @Test
+  public void testDropTableWithAppendedData() throws IOException {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+    testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, SPEC,
+        FileFormat.PARQUET, ImmutableList.of());
+
+    org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+    testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, FileFormat.PARQUET, null,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("DROP TABLE customers");
+  }
+
+  @Test
+  public void testDropHiveTableWithoutUnderlyingTable() throws IOException {
+    Assume.assumeFalse("Not relevant for HiveCatalog", Catalogs.hiveCatalog(shell.getHiveConf()));
+
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+    // Create the Iceberg table in non-HiveCatalog
+    testTables.createIcebergTable(shell.getHiveConf(), identifier.name(),
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, FileFormat.PARQUET,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    // Create Hive table on top
+    String tableLocation = testTables.locationForCreateTableSQL(identifier);
+    shell.executeStatement(testTables.createHiveTableSQL(identifier,
+        ImmutableMap.of(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE")));
+
+    // Drop the Iceberg table
+    Properties properties = new Properties();
+    properties.put(Catalogs.NAME, identifier.toString());
+    properties.put(Catalogs.LOCATION, tableLocation);
+    Catalogs.dropTable(shell.getHiveConf(), properties);
+
+    // Finally drop the Hive table as well
+    shell.executeStatement("DROP TABLE " + identifier);
+  }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 375fff3..4f84556 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -106,13 +106,21 @@ abstract class TestTables {
    * string which we have to execute. Overridden for HiveCatalog where the Hive table is immediately created
    * during the Iceberg table creation so no extra sql execution is required.
    * @param identifier The table identifier (the namespace should be non-empty and single level)
+   * @param tableProps Optional map of table properties
    * @return The SQL string - which should be executed, null - if it is not needed.
    */
-  public String createHiveTableSQL(TableIdentifier identifier) {
+  public String createHiveTableSQL(TableIdentifier identifier, Map<String, String> tableProps) {
     Preconditions.checkArgument(!identifier.namespace().isEmpty(), "Namespace should not be empty");
     Preconditions.checkArgument(identifier.namespace().levels().length == 1, "Namespace should be single level");
-    return String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(),
+    String sql = String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(),
         HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier));
+    if (tableProps != null && !tableProps.isEmpty()) {
+      String props = tableProps.entrySet().stream()
+          .map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue()))
+          .collect(Collectors.joining(","));
+      sql += " TBLPROPERTIES (" + props + ")";
+    }
+    return sql;
   }
 
   /**
@@ -140,7 +148,7 @@ abstract class TestTables {
   public Table createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat,
       List<Record> records) throws IOException {
     Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, records);
-    String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName));
+    String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName), ImmutableMap.of());
     if (createHiveSQL != null) {
       shell.executeStatement(createHiveSQL);
     }
@@ -172,18 +180,20 @@ abstract class TestTables {
         PartitionSpecParser.toJson(spec) + "', " +
         "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
 
-    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
-
-    records.forEach(record -> {
-      query.append("(");
-      query.append(record.struct().fields().stream()
-          .map(field -> getStringValueForInsert(record.getField(field.name()), field.type()))
-          .collect(Collectors.joining(",")));
-      query.append("),");
-    });
-    query.setLength(query.length() - 1);
-
-    shell.executeStatement(query.toString());
+    if (records != null && !records.isEmpty()) {
+      StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+
+      records.forEach(record -> {
+        query.append("(");
+        query.append(record.struct().fields().stream()
+                .map(field -> getStringValueForInsert(record.getField(field.name()), field.type()))
+                .collect(Collectors.joining(",")));
+        query.append("),");
+      });
+      query.setLength(query.length() - 1);
+
+      shell.executeStatement(query.toString());
+    }
 
     return loadTable(identifier);
   }
@@ -386,7 +396,7 @@ abstract class TestTables {
     }
 
     @Override
-    public String createHiveTableSQL(TableIdentifier identifier) {
+    public String createHiveTableSQL(TableIdentifier identifier, Map<String, String> tblProps) {
       return null;
     }
   }

[iceberg] 02/18: Spark: Refresh relation cache in DELETE and MERGE (#2154)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 9b49c2175aabc730243e07a4041fb158554fe756
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jan 26 12:42:56 2021 -0800

    Spark: Refresh relation cache in DELETE and MERGE (#2154)
---
 .../v2/ExtendedDataSourceV2Strategy.scala          |  9 ++++--
 .../execution/datasources/v2/ReplaceDataExec.scala |  9 ++++--
 .../SparkRowLevelOperationsTestBase.java           | 24 +++++++++++++--
 .../iceberg/spark/extensions/TestDelete.java       | 35 ++++++++++++++++++++++
 .../apache/iceberg/spark/extensions/TestMerge.java | 29 ++++++++++++++++++
 5 files changed, 100 insertions(+), 6 deletions(-)

diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 3a8072e..ac2fc2e 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -25,6 +25,7 @@ import org.apache.iceberg.spark.SparkSessionCatalog
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.NamedRelation
 import org.apache.spark.sql.catalyst.expressions.And
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
@@ -81,8 +82,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
       val batchExec = ExtendedBatchScanExec(output, scan)
       withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
 
-    case ReplaceData(_, batchWrite, query) =>
-      ReplaceDataExec(batchWrite, planLater(query)) :: Nil
+    case ReplaceData(relation, batchWrite, query) =>
+      ReplaceDataExec(batchWrite, refreshCache(relation), planLater(query)) :: Nil
 
     case MergeInto(mergeIntoParams, output, child) =>
       MergeIntoExec(mergeIntoParams, output, planLater(child)) :: Nil
@@ -113,6 +114,10 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
     }
   }
 
+  private def refreshCache(r: NamedRelation)(): Unit = {
+    spark.sharedState.cacheManager.recacheByPlan(spark, r)
+  }
+
   private object IcebergCatalogAndIdentifier {
     def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] = {
       val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, identifier.asJava)
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceDataExec.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceDataExec.scala
index 2551a18..f26a8c7 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceDataExec.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceDataExec.scala
@@ -23,11 +23,16 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.write.BatchWrite
 import org.apache.spark.sql.execution.SparkPlan
 
-case class ReplaceDataExec(batchWrite: BatchWrite, query: SparkPlan) extends V2TableWriteExec {
+case class ReplaceDataExec(
+    batchWrite: BatchWrite,
+    refreshCache: () => Unit,
+    query: SparkPlan) extends V2TableWriteExec {
 
   override protected def run(): Seq[InternalRow] = {
     // calling prepare() ensures we execute DynamicFileFilter if present
     prepare()
-    writeWithV2(batchWrite)
+    val writtenRows = writeWithV2(batchWrite)
+    refreshCache()
+    writtenRows
   }
 }
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index b54d24c..1d19a27 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -30,6 +30,8 @@ import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -110,8 +112,21 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
   }
 
   protected void createAndInitTable(String schema) {
+    createAndInitTable(schema, null);
+  }
+
+  protected void createAndInitTable(String schema, String jsonData) {
     sql("CREATE TABLE %s (%s) USING iceberg", tableName, schema);
     initTable();
+
+    if (jsonData != null) {
+      try {
+        Dataset<Row> ds = toDS(schema, jsonData);
+        ds.writeTo(tableName).append();
+      } catch (NoSuchTableException e) {
+        throw new RuntimeException("Failed to write data", e);
+      }
+    }
   }
 
   protected void createOrReplaceView(String name, String jsonData) {
@@ -119,15 +134,20 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
   }
 
   protected void createOrReplaceView(String name, String schema, String jsonData) {
+    Dataset<Row> ds = toDS(schema, jsonData);
+    ds.createOrReplaceTempView(name);
+  }
+
+  private Dataset<Row> toDS(String schema, String jsonData) {
     List<String> jsonRows = Arrays.stream(jsonData.split("\n"))
         .filter(str -> str.trim().length() > 0)
         .collect(Collectors.toList());
     Dataset<String> jsonDS = spark.createDataset(jsonRows, Encoders.STRING());
 
     if (schema != null) {
-      spark.read().schema(schema).json(jsonDS).createOrReplaceTempView(name);
+      return spark.read().schema(schema).json(jsonDS);
     } else {
-      spark.read().json(jsonDS).createOrReplaceTempView(name);
+      return spark.read().json(jsonDS);
     }
   }
 }
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 31986db..3f3b010 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -657,6 +657,41 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
   }
 
+  @Test
+  public void testDeleteRefreshesRelationCache() throws NoSuchTableException {
+    createAndInitPartitionedTable();
+
+    append(new Employee(1, "hr"), new Employee(3, "hr"));
+    append(new Employee(1, "hardware"), new Employee(2, "hardware"));
+
+    Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 1");
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row(1, "hardware"), row(1, "hr")),
+        sql("SELECT * FROM tmp ORDER BY id, dep"));
+
+    sql("DELETE FROM %s WHERE id = 1", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+    validateSnapshot(currentSnapshot, "overwrite", "2", "2", "2");
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(2, "hardware"), row(3, "hr")),
+        sql("SELECT * FROM %s ORDER BY id, dep", tableName));
+
+    assertEquals("Should refresh the relation cache",
+        ImmutableList.of(),
+        sql("SELECT * FROM tmp ORDER BY id, dep"));
+
+    spark.sql("UNCACHE TABLE tmp");
+  }
+
   // TODO: multiple stripes for ORC
 
   protected void validateSnapshot(Snapshot snapshot, String operation, String changedPartitionCount,
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index c5dc5aa..4a004f5 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -21,8 +21,11 @@ package org.apache.iceberg.spark.extensions;
 
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,6 +53,32 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
   // TODO: tests for subqueries in conditions
 
   @Test
+  public void testMergeRefreshesRelationCache() {
+    createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
+    createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");
+
+    Dataset<Row> query = spark.sql("SELECT name FROM " + tableName);
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n1")),
+        sql("SELECT * FROM tmp"));
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET t.name = s.name", tableName);
+
+    assertEquals("View should have correct data",
+        ImmutableList.of(row("n2")),
+        sql("SELECT * FROM tmp"));
+
+    spark.sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
   public void testMergeWithNonExistingColumns() {
     createAndInitTable("id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");

[iceberg] 01/18: Hive: Fix writing of Date, Decimal, Time and UUID types. (#2126)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit e12d468f59993f3ed5dbd00b0d2721c4851cbba6
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Mon Jan 25 15:33:15 2021 +0100

    Hive: Fix writing of Date, Decimal, Time and UUID types. (#2126)
---
 .../IcebergDateObjectInspectorHive3.java           | 11 ++++-
 .../IcebergDecimalObjectInspector.java             |  9 ++++-
 .../IcebergTimeObjectInspector.java                |  5 ++-
 .../IcebergUUIDObjectInspector.java                |  5 ++-
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      | 10 +++--
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 47 ++++++++++++++++++++++
 .../TestIcebergTimeObjectInspector.java            |  5 ++-
 .../TestIcebergUUIDObjectInspector.java            | 11 ++---
 8 files changed, 86 insertions(+), 17 deletions(-)

diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java
index 6ffa600..ab64b36 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspectorHive3.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.iceberg.util.DateTimeUtil;
 
 public final class IcebergDateObjectInspectorHive3 extends AbstractPrimitiveJavaObjectInspector
-    implements DateObjectInspector {
+    implements DateObjectInspector, WriteObjectInspector {
 
   private static final IcebergDateObjectInspectorHive3 INSTANCE = new IcebergDateObjectInspectorHive3();
 
@@ -69,4 +69,13 @@ public final class IcebergDateObjectInspectorHive3 extends AbstractPrimitiveJava
     }
   }
 
+  @Override
+  public LocalDate convert(Object o) {
+    if (o == null) {
+      return null;
+    }
+
+    Date date = (Date) o;
+    return LocalDate.of(date.getYear(), date.getMonth(), date.getDay());
+  }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
index 3a16833..b30a3fa 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
@@ -80,6 +80,13 @@ public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaOb
 
   @Override
   public BigDecimal convert(Object o) {
-    return o == null ? null : ((HiveDecimal) o).bigDecimalValue();
+    if (o == null) {
+      return null;
+    }
+
+    BigDecimal result = ((HiveDecimal) o).bigDecimalValue();
+    // during the HiveDecimal to BigDecimal conversion the scale is lost, when the value is 0
+    result = result.setScale(scale());
+    return result;
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimeObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimeObjectInspector.java
index f5be1fe..f38ec5b 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimeObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimeObjectInspector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.mr.hive.serde.objectinspector;
 
+import java.time.LocalTime;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -49,8 +50,8 @@ public class IcebergTimeObjectInspector extends AbstractPrimitiveJavaObjectInspe
   }
 
   @Override
-  public Object convert(Object o) {
-    return o == null ? null : o.toString();
+  public LocalTime convert(Object o) {
+    return o == null ? null : LocalTime.parse((String) o);
   }
 
   @Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergUUIDObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergUUIDObjectInspector.java
index 0cb8464..e5c44f3 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergUUIDObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergUUIDObjectInspector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.mr.hive.serde.objectinspector;
 
+import java.util.UUID;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -49,8 +50,8 @@ public class IcebergUUIDObjectInspector extends AbstractPrimitiveJavaObjectInspe
   }
 
   @Override
-  public String convert(Object o) {
-    return o == null ? null : o.toString();
+  public UUID convert(Object o) {
+    return o == null ? null : UUID.fromString(o.toString());
   }
 
   @Override
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index 7f497cd..0bb7a3a 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -29,12 +29,14 @@ import java.nio.file.Paths;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -128,8 +130,8 @@ public class HiveIcebergTestUtils {
     record.set(9, new byte[]{0, 1, 2});
     record.set(10, ByteBuffer.wrap(new byte[]{0, 1, 2, 3}));
     record.set(11, new BigDecimal("0.0000000013"));
-    record.set(12, "11:33");
-    record.set(13, "73689599-d7fc-4dfb-b94e-106ff20284a5");
+    record.set(12, LocalTime.of(11, 33));
+    record.set(13, UUID.fromString("73689599-d7fc-4dfb-b94e-106ff20284a5"));
 
     return record;
   }
@@ -167,8 +169,8 @@ public class HiveIcebergTestUtils {
         new BytesWritable(record.get(9, byte[].class)),
         new BytesWritable(ByteBuffers.toByteArray(record.get(10, ByteBuffer.class))),
         new HiveDecimalWritable(HiveDecimal.create(record.get(11, BigDecimal.class))),
-        new Text(record.get(12, String.class)),
-        new Text(record.get(13, String.class))
+        new Text(record.get(12, LocalTime.class).toString()),
+        new Text(record.get(13, UUID.class).toString())
     );
   }
 
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 6a27d62..78f65d6 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -20,6 +20,9 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -310,6 +313,36 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
   }
 
+  @Test
+  public void testInsertSupportedTypes() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    for (int i = 0; i < SUPPORTED_TYPES.size(); i++) {
+      Type type = SUPPORTED_TYPES.get(i);
+      // TODO: remove this filter when issue #1881 is resolved
+      if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
+        continue;
+      }
+      // TODO: remove this filter when we figure out how we could test binary types
+      if (type.equals(Types.BinaryType.get()) || type.equals(Types.FixedType.ofLength(5))) {
+        continue;
+      }
+      String tableName = type.typeId().toString().toLowerCase() + "_table_" + i;
+      String columnName = type.typeId().toString().toLowerCase() + "_column";
+
+      Schema schema = new Schema(required(1, "id", Types.LongType.get()), required(2, columnName, type));
+      List<Record> expected = TestHelper.generateRandomRecords(schema, 5, 0L);
+
+      Table table = testTables.createTable(shell, tableName, schema, fileFormat, ImmutableList.of());
+      StringBuilder query = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES")
+              .append(expected.stream()
+                      .map(r -> String.format("(%s,%s)", r.get(0),
+                              getStringValueForInsert(r.get(1), type)))
+                      .collect(Collectors.joining(",")));
+      shell.executeStatement(query.toString());
+      HiveIcebergTestUtils.validateData(table, expected, 0);
+    }
+  }
+
   /**
    * Testing map only inserts.
    * @throws IOException If there is an underlying IOException
@@ -579,4 +612,18 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     }
     return query;
   }
+
+  private String getStringValueForInsert(Object value, Type type) {
+    String template = "\'%s\'";
+    if (type.equals(Types.TimestampType.withoutZone())) {
+      return String.format(template, Timestamp.valueOf((LocalDateTime) value).toString());
+    } else if (type.equals(Types.TimestampType.withZone())) {
+      return String.format(template, Timestamp.from(((OffsetDateTime) value).toInstant()).toString());
+    } else if (type.equals(Types.BooleanType.get())) {
+      // in hive2 boolean type values must not be surrounded in apostrophes. Otherwise the value is translated to true.
+      return value.toString();
+    } else {
+      return String.format(template, value.toString());
+    }
+  }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimeObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimeObjectInspector.java
index 5ca5fe6..c635920 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimeObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimeObjectInspector.java
@@ -48,12 +48,13 @@ public class TestIcebergTimeObjectInspector {
     Assert.assertNull(oi.getPrimitiveWritableObject(null));
     Assert.assertNull(oi.convert(null));
 
-    String time = LocalTime.now().toString();
+    LocalTime localTime = LocalTime.now();
+    String time = localTime.toString();
     Text text = new Text(time);
 
     Assert.assertEquals(time, oi.getPrimitiveJavaObject(text));
     Assert.assertEquals(text, oi.getPrimitiveWritableObject(time));
-    Assert.assertEquals(time, oi.convert(text));
+    Assert.assertEquals(localTime, oi.convert(time));
 
     Text copy = (Text) oi.copyObject(text);
 
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergUUIDObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergUUIDObjectInspector.java
index da13b32..cbf55e3 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergUUIDObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergUUIDObjectInspector.java
@@ -47,12 +47,13 @@ public class TestIcebergUUIDObjectInspector {
     Assert.assertNull(oi.getPrimitiveWritableObject(null));
     Assert.assertNull(oi.convert(null));
 
-    String uuid = UUID.randomUUID().toString();
-    Text text = new Text(uuid);
+    UUID uuid = UUID.randomUUID();
+    String uuidStr = uuid.toString();
+    Text text = new Text(uuidStr);
 
-    Assert.assertEquals(uuid, oi.getPrimitiveJavaObject(text));
-    Assert.assertEquals(text, oi.getPrimitiveWritableObject(uuid));
-    Assert.assertEquals(uuid, oi.convert(text));
+    Assert.assertEquals(uuidStr, oi.getPrimitiveJavaObject(text));
+    Assert.assertEquals(text, oi.getPrimitiveWritableObject(uuidStr));
+    Assert.assertEquals(uuid, oi.convert(uuidStr));
 
     Text copy = (Text) oi.copyObject(text);
 

[iceberg] 07/18: Core: Fix data loss in compact action (#2196)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 4b2f3904e14ac1236c62f37c692c1904b04bfb9a
Author: Stephen-Robin <77...@users.noreply.github.com>
AuthorDate: Thu Feb 4 09:19:14 2021 +0800

    Core: Fix data loss in compact action (#2196)
    
    Fixes #2195.
---
 .../actions/BaseRewriteDataFilesAction.java        | 11 ++++-
 .../actions/TestRewriteDataFilesAction.java        | 54 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 7ce1357..2305388 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -226,7 +226,7 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
-        .filter(task -> task.files().size() > 1)
+        .filter(task -> task.files().size() > 1 || isPartialFileScan(task))
         .collect(Collectors.toList());
 
     if (combinedScanTasks.isEmpty()) {
@@ -273,6 +273,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
     }
   }
 
+  private boolean isPartialFileScan(CombinedScanTask task) {
+    if (task.files().size() == 1) {
+      FileScanTask fileScanTask = task.files().iterator().next();
+      return fileScanTask.file().fileSizeInBytes() != fileScanTask.length();
+    } else {
+      return false;
+    }
+  }
+
   protected abstract FileIO fileIO();
 
   protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
index 5c6ae59..5e72dd1 100644
--- a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
+++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -20,8 +20,11 @@
 package org.apache.iceberg.actions;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -37,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -311,6 +315,56 @@ public abstract class TestRewriteDataFilesAction extends SparkTestBase {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList();
+
+    IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i)));
+    Dataset<Row> df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1);
+    writeDF(df);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));
+    Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("origin");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from origin sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = maxSizeFile.fileSizeInBytes() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite");
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from postRewrite sort by c2");
+
+    Assert.assertEquals(originalNumRecords, postRewriteNumRecords);
+    assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords);
+  }
+
   private void writeRecords(List<ThreeColumnRecord> records) {
     Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
     writeDF(df);

[iceberg] 08/18: Spark: Fix _pos metadata column in SparkAvroReader (#2215)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 756e9e61a22b708d11df1a14404560396118a370
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Feb 4 16:53:26 2021 -0800

    Spark: Fix _pos metadata column in SparkAvroReader (#2215)
---
 .../java/org/apache/iceberg/spark/data/SparkAvroReader.java   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index 46c594e..c693e2e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -22,12 +22,14 @@ package org.apache.iceberg.spark.data;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.data.avro.DecoderResolver;
@@ -37,7 +39,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 
-public class SparkAvroReader implements DatumReader<InternalRow> {
+public class SparkAvroReader implements DatumReader<InternalRow>, SupportsRowPosition {
 
   private final Schema readSchema;
   private final ValueReader<InternalRow> reader;
@@ -64,6 +66,13 @@ public class SparkAvroReader implements DatumReader<InternalRow> {
     return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
   }
 
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
   private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;
 

[iceberg] 11/18: Parquet: Fix row group filters with promoted types (#2232)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 2feeab5a47c91386a2ed26b744f1936157358521
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri Feb 12 20:38:57 2021 -0800

    Parquet: Fix row group filters with promoted types (#2232)
    
    This fixes Parquet row group filters when types have been promoted from int to long or from float to double.
    
    The filters are passed the file schema after ids are added, which is used to convert dictionary values or lower/upper bounds. That conversion currently uses the file's types to deserialize, but the filter expression is bound to the table types. If the types differ, then comparison in the evaluator fails.
    
    This updates the conversion to first deserialize the Parquet value and then promote it if the table's type has changed. Only int to long and float to double are needed because those are the only type promotions that use a different representation.
---
 .../iceberg/data/TestMetricsRowGroupFilter.java      |  9 +++++++++
 .../apache/iceberg/parquet/ParquetConversions.java   | 15 +++++++++++++++
 .../parquet/ParquetDictionaryRowGroupFilter.java     |  6 +++++-
 .../parquet/ParquetMetricsRowGroupFilter.java        | 20 ++++++++++++++++++--
 .../parquet/TestDictionaryRowGroupFilter.java        |  8 ++++++++
 5 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index b82d6eb..f03544d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -812,6 +812,15 @@ public class TestMetricsRowGroupFilter {
     Assert.assertTrue("Should read if IN is not evaluated", shouldRead);
   }
 
+  @Test
+  public void testParquetTypePromotion() {
+    Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET);
+    Schema promotedSchema = new Schema(required(1, "id", Types.LongType.get()));
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
+
   private boolean shouldRead(Expression expression) {
     return shouldRead(expression, true);
   }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index 431c636..78b3a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -68,6 +68,21 @@ class ParquetConversions {
     }
   }
 
+  static Function<Object, Object> converterFromParquet(PrimitiveType parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
+
   static Function<Object, Object> converterFromParquet(PrimitiveType type) {
     if (type.getOriginalType() != null) {
       switch (type.getOriginalType()) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index d72cf49..37c7d6e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.NaNUtil;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -49,6 +50,7 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 
 public class ParquetDictionaryRowGroupFilter {
+  private final Schema schema;
   private final Expression expr;
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
@@ -56,6 +58,7 @@ public class ParquetDictionaryRowGroupFilter {
   }
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
+    this.schema = schema;
     StructType struct = schema.asStruct();
     this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive);
   }
@@ -96,8 +99,9 @@ public class ParquetDictionaryRowGroupFilter {
         PrimitiveType colType = fileSchema.getType(desc.getPath()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           cols.put(id, desc);
-          conversions.put(id, ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType));
         }
       }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index ee026cc..f83d701 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -77,7 +77,7 @@ public class ParquetMetricsRowGroupFilter {
   private static final boolean ROWS_CANNOT_MATCH = false;
 
   private class MetricsEvalVisitor extends BoundExpressionVisitor<Boolean> {
-    private Map<Integer, Statistics> stats = null;
+    private Map<Integer, Statistics<?>> stats = null;
     private Map<Integer, Long> valueCounts = null;
     private Map<Integer, Function<Object, Object>> conversions = null;
 
@@ -93,9 +93,10 @@ public class ParquetMetricsRowGroupFilter {
         PrimitiveType colType = fileSchema.getType(col.getPath().toArray()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           stats.put(id, col.getStatistics());
           valueCounts.put(id, col.getValueCount());
-          conversions.put(id, ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType));
         }
       }
 
@@ -502,4 +503,19 @@ public class ParquetMetricsRowGroupFilter {
     return statistics.getNumNulls() < valueCount &&
         (statistics.getMaxBytes() == null || statistics.getMinBytes() == null);
   }
+
+  private static Function<Object, Object> converterFor(PrimitiveType parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = ParquetConversions.converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index a5e7a35..c02bf27 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -884,4 +884,12 @@ public class TestDictionaryRowGroupFilter {
         .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
     Assert.assertFalse("Should not read: notIn on no nulls column (empty string is within the set)", shouldRead);
   }
+
+  @Test
+  public void testTypePromotion() {
+    Schema promotedSchema = new Schema(required(1, "id", LongType.get()));
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
 }

[iceberg] 18/18: Spark: Remove softValues for Spark 2 catalog cache (#2363)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit aba388116ac84980ff6e6ec378c16dcd3b4bf4ca
Author: Sam Pringle <pr...@hey.com>
AuthorDate: Wed Mar 24 14:29:56 2021 -0400

    Spark: Remove softValues for Spark 2 catalog cache (#2363)
---
 .../src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java b/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
index 742cac5..54e1748 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
@@ -39,8 +39,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
 
 public final class CustomCatalogs {
-  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
-      .softValues().build();
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().build();
 
   public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
   public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";

[iceberg] 14/18: Hive: Fix predicate pushdown for Date (#2254)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 1f02595fb408b8ac12c83a758fa6fe9705f5ce8b
Author: pvary <pv...@cloudera.com>
AuthorDate: Fri Feb 26 12:51:10 2021 +0100

    Hive: Fix predicate pushdown for Date (#2254)
---
 .../iceberg/mr/hive/HiveIcebergFilterFactory.java  |  16 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |   6 +-
 .../mr/hive/TestHiveIcebergFilterFactory.java      |   2 +-
 .../TestHiveIcebergStorageHandlerTimezone.java     | 172 +++++++++++++++++++++
 .../TestHiveIcebergStorageHandlerWithEngine.java   |   6 +-
 5 files changed, 191 insertions(+), 11 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 33791c7..a645874 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.mr.hive;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.time.Instant;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
@@ -172,15 +171,24 @@ public class HiveIcebergFilterFactory {
     return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
   }
 
+  // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date
+  // Which uses `java.util.Date()` internally to create the object and that uses the TimeZone.getDefaultRef()
+  // To get back the expected date we have to use the LocalDate which gets rid of the TimeZone misery as it uses
+  // the year/month/day to generate the object
   private static int daysFromDate(Date date) {
-    return DateTimeUtil.daysFromInstant(Instant.ofEpochMilli(date.getTime()));
+    return DateTimeUtil.daysFromDate(date.toLocalDate());
   }
 
+  // Hive uses `java.sql.Timestamp.valueOf(lit.toString());` to convert a literal to Timestamp
+  // Which again uses `java.util.Date()` internally to create the object which uses the TimeZone.getDefaultRef()
+  // To get back the expected timestamp we have to use the LocalDateTime which gets rid of the TimeZone misery
+  // as it uses the year/month/day/hour/min/sec/nanos to generate the object
   private static int daysFromTimestamp(Timestamp timestamp) {
-    return DateTimeUtil.daysFromInstant(timestamp.toInstant());
+    return DateTimeUtil.daysFromDate(timestamp.toLocalDateTime().toLocalDate());
   }
 
+  // We have to use the LocalDateTime to get the micros. See the comment above.
   private static long microsFromTimestamp(Timestamp timestamp) {
-    return DateTimeUtil.microsFromInstant(timestamp.toInstant());
+    return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime());
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 033e507..a67c513 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.data.GenericDeleteFilter;
 import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -284,8 +285,11 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
 
       if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
+        // Date and timestamp values are not the correct type for Evaluator.
+        // Wrapping to return the expected type.
+        InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
         Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
-        return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
+        return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
       } else {
         return iter;
       }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
index 3d436dc..3044f04 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java
@@ -218,7 +218,7 @@ public class TestHiveIcebergFilterFactory {
   @Test
   public void testDateType() {
     SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
-    Date gmtDate = new Date(LocalDate.of(2015, 11, 12).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
+    Date gmtDate = Date.valueOf(LocalDate.of(2015, 11, 12));
     SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, gmtDate).end().build();
 
     UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value());
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
new file mode 100644
index 0000000..ea6b4d1
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerTimezone.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.runners.Parameterized.Parameter;
+import static org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerTimezone {
+  private static final Optional<ThreadLocal<DateFormat>> dateFormat =
+      Optional.ofNullable((ThreadLocal<DateFormat>) DynFields.builder()
+          .hiddenImpl(TimestampWritable.class, "threadLocalDateFormat")
+          .defaultAlwaysNull()
+          .buildStatic()
+          .get());
+
+  private static final Optional<ThreadLocal<TimeZone>> localTimeZone =
+      Optional.ofNullable((ThreadLocal<TimeZone>) DynFields.builder()
+          .hiddenImpl(DateWritable.class, "LOCAL_TIMEZONE")
+          .defaultAlwaysNull()
+          .buildStatic()
+          .get());
+
+  @Parameters(name = "timezone={0}")
+  public static Collection<Object[]> parameters() {
+    return ImmutableList.of(
+        new String[] {"America/New_York"},
+        new String[] {"Asia/Kolkata"},
+        new String[] {"UTC/Greenwich"}
+    );
+  }
+
+  private static TestHiveShell shell;
+
+  private TestTables testTables;
+
+  @Parameter(0)
+  public String timezoneString;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @BeforeClass
+  public static void beforeClass() {
+    shell = HiveIcebergStorageHandlerTestUtils.shell();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    shell.stop();
+  }
+
+  @Before
+  public void before() throws IOException {
+    TimeZone.setDefault(TimeZone.getTimeZone(timezoneString));
+
+    // Magic to clean cached date format and local timezone for Hive where the default timezone is used/stored in the
+    // cached object
+    dateFormat.ifPresent(ThreadLocal::remove);
+    localTimeZone.ifPresent(ThreadLocal::remove);
+
+    this.testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
+    // Uses spark as an engine so we can detect if we unintentionally try to use any execution engines
+    HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp, "spark");
+  }
+
+  @After
+  public void after() throws Exception {
+    HiveIcebergStorageHandlerTestUtils.close(shell);
+  }
+
+  @Test
+  public void testDateQuery() throws IOException {
+    Schema dateSchema = new Schema(optional(1, "d_date", Types.DateType.get()));
+
+    List<Record> records = TestHelper.RecordsBuilder.newInstance(dateSchema)
+        .add(LocalDate.of(2020, 1, 21))
+        .add(LocalDate.of(2020, 1, 24))
+        .build();
+
+    testTables.createTable(shell, "date_test", dateSchema, FileFormat.PARQUET, records);
+
+    List<Object[]> result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-21'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date in ('2020-01-21', '2020-01-22')");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-21", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date > '2020-01-21'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2020-01-24", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * from date_test WHERE d_date='2020-01-20'");
+    Assert.assertEquals(0, result.size());
+  }
+
+  @Test
+  public void testTimestampQuery() throws IOException {
+    Schema timestampSchema = new Schema(optional(1, "d_ts", Types.TimestampType.withoutZone()));
+
+    List<Record> records = TestHelper.RecordsBuilder.newInstance(timestampSchema)
+        .add(LocalDateTime.of(2019, 1, 22, 9, 44, 54, 100000000))
+        .add(LocalDateTime.of(2019, 2, 22, 9, 44, 54, 200000000))
+        .build();
+
+    testTables.createTable(shell, "ts_test", timestampSchema, FileFormat.PARQUET, records);
+
+    List<Object[]> result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts='2019-02-22 09:44:54.2'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+    result = shell.executeStatement(
+        "SELECT * FROM ts_test WHERE d_ts in ('2017-01-01 22:30:57.1', '2019-02-22 09:44:54.2')");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-02-22 09:44:54.2", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT d_ts FROM ts_test WHERE d_ts < '2019-02-22 09:44:54.2'");
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("2019-01-22 09:44:54.1", result.get(0)[0]);
+
+    result = shell.executeStatement("SELECT * FROM ts_test WHERE d_ts='2017-01-01 22:30:57.3'");
+    Assert.assertEquals(0, result.size());
+  }
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index ec79c12..9decb0c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -150,11 +150,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
 
   @After
   public void after() throws Exception {
-    shell.closeSession();
-    shell.metastore().reset();
-    // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the
-    // HiveServer2 is stopped. Only Finalizer closes the HMS connections.
-    System.gc();
+    HiveIcebergStorageHandlerTestUtils.close(shell);
   }
 
   @Test