You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/17 23:37:08 UTC
[incubator-iceberg] branch master updated: Spark: Support loading
Hive partitions that match a filter expression (#248)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d105abc Spark: Support loading Hive partitions that match a filter expression (#248)
d105abc is described below
commit d105abcb910a6de9a33b5ece61ac27f753044505
Author: Puneet Zaroo <pu...@gmail.com>
AuthorDate: Thu Jul 18 05:07:04 2019 +0530
Spark: Support loading Hive partitions that match a filter expression (#248)
---
.../org/apache/iceberg/hive/HiveTableBaseTest.java | 11 +-
.../java/org/apache/iceberg/spark/hacks/Hive.java | 19 +++-
.../org/apache/iceberg/spark/SparkTableUtil.scala | 20 ++++
.../iceberg/spark/source/TestSparkTableUtil.java | 119 +++++++++++++++++++++
4 files changed, 164 insertions(+), 5 deletions(-)
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 13073ff..8502044 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -47,7 +47,7 @@ import static org.apache.iceberg.types.Types.NestedField.required;
public class HiveTableBaseTest {
- static final String DB_NAME = "hivedb";
+ protected static final String DB_NAME = "hivedb";
static final String TABLE_NAME = "tbl";
static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
@@ -60,11 +60,11 @@ public class HiveTableBaseTest {
private static final PartitionSpec partitionSpec = builderFor(schema).identity("id").build();
- private static HiveConf hiveConf;
private static TestHiveMetastore metastore;
protected static HiveMetaStoreClient metastoreClient;
protected static HiveCatalog catalog;
+ protected static HiveConf hiveConf;
@BeforeClass
public static void startMetastore() throws Exception {
@@ -102,14 +102,17 @@ public class HiveTableBaseTest {
tableLocation.getFileSystem(hiveConf).delete(tableLocation, true);
catalog.dropTable(TABLE_IDENTIFIER);
}
-
private static String getTableBasePath(String tableName) {
String databasePath = metastore.getDatabasePath(DB_NAME);
return Paths.get(databasePath, tableName).toAbsolutePath().toString();
}
+ protected static Path getTableLocationPath(String tableName) {
+ return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString());
+ }
+
protected static String getTableLocation(String tableName) {
- return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
+ return getTableLocationPath(tableName).toString();
}
private static String metadataLocation(String tableName) {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java b/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
index 1b8eda7..ebd9b35 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.spark.hacks;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition;
+import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.hive.HiveUtils$;
import org.apache.spark.sql.hive.client.HiveClient;
import scala.Option;
@@ -41,8 +44,22 @@ public class Hive {
HiveClient client = HiveUtils$.MODULE$.newClientForMetadata(
spark.sparkContext().conf(),
spark.sparkContext().hadoopConfiguration());
- client.getPartitions(db, table, Option.empty());
return client.getPartitions(db, table, Option.empty());
}
+
+ public static Seq<CatalogTablePartition> partitionsByFilter(SparkSession spark, String name, Expression expression) {
+ List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(name));
+ String db = parts.size() == 1 ? "default" : parts.get(0);
+ String table = parts.get(parts.size() == 1 ? 0 : 1);
+
+ HiveClient client = HiveUtils$.MODULE$.newClientForMetadata(
+ spark.sparkContext().conf(),
+ spark.sparkContext().hadoopConfiguration());
+ List<Expression> expressions = new ArrayList<Expression>(Arrays.asList(expression));
+ Seq<Expression> exprs =
+ scala.collection.JavaConverters.collectionAsScalaIterableConverter(expressions).asScala().toSeq();
+
+ return client.getPartitionsByFilter(client.getTable(db, table), exprs);
+ }
}
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 71f5625..3409807 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -57,6 +57,26 @@ object SparkTableUtil {
}
/**
+ * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
+ *
+ * @param spark a Spark session.
+ * @param table name of the table.
+ * @param expression The expression whose matching partitions are returned.
+ * @return a DataFrame of the table partitions.
+ */
+ def partitionDFByFilter(spark: SparkSession, table: String, expression: String): DataFrame = {
+ import spark.implicits._
+
+ val expr = spark.sessionState.sqlParser.parseExpression(expression)
+ val partitions: Seq[(Map[String, String], Option[String], Option[String])] =
+ Hive.partitionsByFilter(spark, table, expr).map { p: CatalogTablePartition =>
+ (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
+ }
+
+ partitions.toDF("partition", "uri", "format")
+ }
+
+ /**
* Returns the data files in a partition by listing the partition location.
*
* For Parquet partitions, this will read metrics from the file footer. For Avro partitions,
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
new file mode 100644
index 0000000..6eb8d43
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.hive.HiveTableBaseTest;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSparkTableUtil extends HiveTableBaseTest {
+ private static final Configuration CONF = HiveTableBaseTest.hiveConf;
+ private static final String tableName = "hive_table";
+ private static final String dbName = HiveTableBaseTest.DB_NAME;
+ private static final String qualifiedTableName = String.format("%s.%s", dbName, tableName);
+ private static final Path tableLocationPath = HiveTableBaseTest.getTableLocationPath(tableName);
+ private static final String tableLocationStr = tableLocationPath.toString();
+ private static SparkSession spark = null;
+
+ @BeforeClass
+ public static void startSpark() {
+ String metastoreURI = CONF.get(HiveConf.ConfVars.METASTOREURIS.varname);
+
+ // Create a spark session.
+ TestSparkTableUtil.spark = SparkSession.builder().master("local[2]")
+ .enableHiveSupport()
+ .config("spark.hadoop.hive.metastore.uris", metastoreURI)
+ .config("hive.exec.dynamic.partition", "true")
+ .config("hive.exec.dynamic.partition.mode", "nonstrict")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkTableUtil.spark;
+ // Stop the spark session.
+ TestSparkTableUtil.spark = null;
+ currentSpark.stop();
+ }
+
+ @Before
+ public void before() {
+
+ // Create a hive table.
+ SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
+
+ sc.sql(String.format(
+ "CREATE TABLE %s (\n" +
+ " id int COMMENT 'unique id'\n" +
+ ")\n" +
+ " PARTITIONED BY (data string)\n" +
+ " LOCATION '%s'", qualifiedTableName, tableLocationStr)
+ );
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ df.select("id", "data").orderBy("data").write()
+ .mode("append")
+ .insertInto(qualifiedTableName);
+ }
+
+ @After
+ public void after() throws IOException {
+ // Drop the hive table.
+ SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
+ sc.sql(String.format("DROP TABLE IF EXISTS %s", qualifiedTableName));
+
+ // Delete the data corresponding to the table.
+ tableLocationPath.getFileSystem(CONF).delete(tableLocationPath, true);
+ }
+
+ @Test
+ public void testPartitionScan() {
+ Dataset<Row> partitionDF = SparkTableUtil.partitionDF(spark, qualifiedTableName);
+ Assert.assertEquals("There should be 3 partitions", 3, partitionDF.count());
+ }
+
+ @Test
+ public void testPartitionScanByFilter() {
+ Dataset<Row> partitionDF = SparkTableUtil.partitionDFByFilter(spark, qualifiedTableName, "data = 'a'");
+ Assert.assertEquals("There should be 1 matching partition", 1, partitionDF.count());
+ }
+}