You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/17 23:37:08 UTC

[incubator-iceberg] branch master updated: Spark: Support loading Hive partitions that match a filter expression (#248)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d105abc  Spark: Support loading Hive partitions that match a filter expression (#248)
d105abc is described below

commit d105abcb910a6de9a33b5ece61ac27f753044505
Author: Puneet Zaroo <pu...@gmail.com>
AuthorDate: Thu Jul 18 05:07:04 2019 +0530

    Spark: Support loading Hive partitions that match a filter expression (#248)
---
 .../org/apache/iceberg/hive/HiveTableBaseTest.java |  11 +-
 .../java/org/apache/iceberg/spark/hacks/Hive.java  |  19 +++-
 .../org/apache/iceberg/spark/SparkTableUtil.scala  |  20 ++++
 .../iceberg/spark/source/TestSparkTableUtil.java   | 119 +++++++++++++++++++++
 4 files changed, 164 insertions(+), 5 deletions(-)

diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 13073ff..8502044 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -47,7 +47,7 @@ import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class HiveTableBaseTest {
 
-  static final String DB_NAME = "hivedb";
+  protected static final String DB_NAME = "hivedb";
   static final String TABLE_NAME =  "tbl";
   static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
 
@@ -60,11 +60,11 @@ public class HiveTableBaseTest {
 
   private static final PartitionSpec partitionSpec = builderFor(schema).identity("id").build();
 
-  private static HiveConf hiveConf;
   private static TestHiveMetastore metastore;
 
   protected static HiveMetaStoreClient metastoreClient;
   protected static HiveCatalog catalog;
+  protected static HiveConf hiveConf;
 
   @BeforeClass
   public static void startMetastore() throws Exception {
@@ -102,14 +102,17 @@ public class HiveTableBaseTest {
     tableLocation.getFileSystem(hiveConf).delete(tableLocation, true);
     catalog.dropTable(TABLE_IDENTIFIER);
   }
-
   private static String getTableBasePath(String tableName) {
     String databasePath = metastore.getDatabasePath(DB_NAME);
     return Paths.get(databasePath, tableName).toAbsolutePath().toString();
   }
 
+  protected static Path getTableLocationPath(String tableName) {
+    return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString());
+  }
+
   protected static String getTableLocation(String tableName) {
-    return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
+    return getTableLocationPath(tableName).toString();
   }
 
   private static String metadataLocation(String tableName) {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java b/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
index 1b8eda7..ebd9b35 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.spark.hacks;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition;
+import org.apache.spark.sql.catalyst.expressions.Expression;
 import org.apache.spark.sql.hive.HiveUtils$;
 import org.apache.spark.sql.hive.client.HiveClient;
 import scala.Option;
@@ -41,8 +44,22 @@ public class Hive {
     HiveClient client = HiveUtils$.MODULE$.newClientForMetadata(
         spark.sparkContext().conf(),
         spark.sparkContext().hadoopConfiguration());
-    client.getPartitions(db, table, Option.empty());
 
     return client.getPartitions(db, table, Option.empty());
   }
+
+  public static Seq<CatalogTablePartition> partitionsByFilter(SparkSession spark, String name, Expression expression) {
+    List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(name));
+    String db = parts.size() == 1 ? "default" : parts.get(0);
+    String table = parts.get(parts.size() == 1 ? 0 : 1);
+
+    HiveClient client = HiveUtils$.MODULE$.newClientForMetadata(
+            spark.sparkContext().conf(),
+            spark.sparkContext().hadoopConfiguration());
+    List<Expression> expressions = new ArrayList<Expression>(Arrays.asList(expression));
+    Seq<Expression> exprs =
+            scala.collection.JavaConverters.collectionAsScalaIterableConverter(expressions).asScala().toSeq();
+
+    return client.getPartitionsByFilter(client.getTable(db, table), exprs);
+  }
 }
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 71f5625..3409807 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -57,6 +57,26 @@ object SparkTableUtil {
   }
 
   /**
+    * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
+    *
+    * @param spark a Spark session.
+    * @param table name of the table.
+    * @param expression The expression whose matching partitions are returned.
+    * @return a DataFrame of the table partitions.
+    */
+  def partitionDFByFilter(spark: SparkSession, table: String, expression: String): DataFrame = {
+    import spark.implicits._
+
+    val expr = spark.sessionState.sqlParser.parseExpression(expression)
+    val partitions: Seq[(Map[String, String], Option[String], Option[String])] =
+      Hive.partitionsByFilter(spark, table, expr).map { p: CatalogTablePartition =>
+        (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
+      }
+
+    partitions.toDF("partition", "uri", "format")
+  }
+
+  /**
    * Returns the data files in a partition by listing the partition location.
    *
    * For Parquet partitions, this will read metrics from the file footer. For Avro partitions,
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
new file mode 100644
index 0000000..6eb8d43
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.hive.HiveTableBaseTest;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkTableUtil extends HiveTableBaseTest {
+  private static final Configuration CONF = HiveTableBaseTest.hiveConf;
+  private static final String tableName = "hive_table";
+  private static final String dbName = HiveTableBaseTest.DB_NAME;
+  private static final String qualifiedTableName = String.format("%s.%s", dbName, tableName);
+  private static final Path tableLocationPath = HiveTableBaseTest.getTableLocationPath(tableName);
+  private static final String tableLocationStr = tableLocationPath.toString();
+  private static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startSpark() {
+    String metastoreURI = CONF.get(HiveConf.ConfVars.METASTOREURIS.varname);
+
+    // Create a spark session.
+    TestSparkTableUtil.spark = SparkSession.builder().master("local[2]")
+            .enableHiveSupport()
+            .config("spark.hadoop.hive.metastore.uris", metastoreURI)
+            .config("hive.exec.dynamic.partition", "true")
+            .config("hive.exec.dynamic.partition.mode", "nonstrict")
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestSparkTableUtil.spark;
+    // Stop the spark session.
+    TestSparkTableUtil.spark = null;
+    currentSpark.stop();
+  }
+
+  @Before
+  public void before() {
+
+    // Create a hive table.
+    SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
+
+    sc.sql(String.format(
+                    "CREATE TABLE %s (\n" +
+                    "    id int COMMENT 'unique id'\n" +
+                    ")\n" +
+                    " PARTITIONED BY (data string)\n" +
+                    " LOCATION '%s'", qualifiedTableName, tableLocationStr)
+    );
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").orderBy("data").write()
+            .mode("append")
+            .insertInto(qualifiedTableName);
+  }
+
+  @After
+  public void after() throws IOException {
+    // Drop the hive table.
+    SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
+    sc.sql(String.format("DROP TABLE IF EXISTS %s", qualifiedTableName));
+
+    // Delete the data corresponding to the table.
+    tableLocationPath.getFileSystem(CONF).delete(tableLocationPath, true);
+  }
+
+  @Test
+  public void testPartitionScan() {
+    Dataset<Row> partitionDF = SparkTableUtil.partitionDF(spark, qualifiedTableName);
+    Assert.assertEquals("There should be 3 partitions", 3, partitionDF.count());
+  }
+
+  @Test
+  public void testPartitionScanByFilter() {
+    Dataset<Row> partitionDF = SparkTableUtil.partitionDFByFilter(spark, qualifiedTableName, "data = 'a'");
+    Assert.assertEquals("There should be 1 matching partition", 1, partitionDF.count());
+  }
+}