You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/15 17:27:34 UTC
[iceberg] branch master updated: Hive: Fix join issues when CBO is
enabled (#2052)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d50f540 Hive: Fix join issues when CBO is enabled (#2052)
d50f540 is described below
commit d50f54051e72d20d56aac3e725351dc6919199e9
Author: qphien <cl...@126.com>
AuthorDate: Sat Jan 16 01:27:25 2021 +0800
Hive: Fix join issues when CBO is enabled (#2052)
Co-authored-by: 罗冲 <lu...@corp.netease.com>
---
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 17 +++--
.../TestHiveIcebergStorageHandlerWithEngine.java | 77 ++++++++++++++++++----
.../org/apache/iceberg/mr/hive/TestHiveShell.java | 4 ++
3 files changed, 82 insertions(+), 16 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 994d79b..452b1e5 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -66,9 +67,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
assertNotVectorizedTez(configuration);
Schema tableSchema;
- if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) {
- tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA));
- } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
+ if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
} else {
try {
@@ -82,7 +81,17 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
- Schema projectedSchema = selectedColumns.length > 0 ? tableSchema.select(selectedColumns) : tableSchema;
+ // When same table is joined multiple times, it is possible some selected columns are duplicated,
+ // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
+ String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
+ Schema projectedSchema = distinctSelectedColumns.length > 0 ?
+ tableSchema.select(distinctSelectedColumns) : tableSchema;
+ // the input split mapper handles does not belong to this table
+ // it is necessary to ensure projectedSchema equals to tableSchema,
+ // or we cannot find selectOperator's column from inspector
+ if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+ projectedSchema = tableSchema;
+ }
try {
this.inspector = IcebergObjectInspector.create(projectedSchema);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c545a19..7df3418 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -42,6 +42,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
@@ -54,12 +55,26 @@ public class TestHiveIcebergStorageHandlerWithEngine {
private static final Schema ORDER_SCHEMA = new Schema(
required(1, "order_id", Types.LongType.get()),
required(2, "customer_id", Types.LongType.get()),
- required(3, "total", Types.DoubleType.get()));
+ required(3, "total", Types.DoubleType.get()),
+ required(4, "product_id", Types.LongType.get())
+ );
private static final List<Record> ORDER_RECORDS = TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA)
- .add(100L, 0L, 11.11d)
- .add(101L, 0L, 22.22d)
- .add(102L, 1L, 33.33d)
+ .add(100L, 0L, 11.11d, 1L)
+ .add(101L, 0L, 22.22d, 2L)
+ .add(102L, 1L, 33.33d, 3L)
+ .build();
+
+ private static final Schema PRODUCT_SCHEMA = new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "name", Types.StringType.get()),
+ optional(3, "price", Types.DoubleType.get())
+ );
+
+ private static final List<Record> PRODUCT_RECORDS = TestHelper.RecordsBuilder.newInstance(PRODUCT_SCHEMA)
+ .add(1L, "skirt", 11.11d)
+ .add(2L, "tee", 22.22d)
+ .add(3L, "watch", 33.33d)
.build();
private static final List<Type> SUPPORTED_TYPES =
@@ -151,20 +166,58 @@ public class TestHiveIcebergStorageHandlerWithEngine {
}
@Test
- public void testJoinTables() throws IOException {
+ public void testCBOWithSelectedColumnsNonOverlapJoin() throws IOException {
+ shell.setHiveSessionValue("hive.cbo.enable", true);
+
+ testTables.createTable(shell, "products", PRODUCT_SCHEMA, fileFormat, PRODUCT_RECORDS);
+ testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
+
+ List<Object[]> rows = shell.executeStatement(
+ "SELECT o.order_id, o.customer_id, o.total, p.name " +
+ "FROM default.orders o JOIN default.products p ON o.product_id = p.id ORDER BY o.order_id"
+ );
+
+ Assert.assertEquals(3, rows.size());
+ Assert.assertArrayEquals(new Object[] {100L, 0L, 11.11d, "skirt"}, rows.get(0));
+ Assert.assertArrayEquals(new Object[] {101L, 0L, 22.22d, "tee"}, rows.get(1));
+ Assert.assertArrayEquals(new Object[] {102L, 1L, 33.33d, "watch"}, rows.get(2));
+ }
+
+ @Test
+ public void testCBOWithSelectedColumnsOverlapJoin() throws IOException {
+ shell.setHiveSessionValue("hive.cbo.enable", true);
+
testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
- HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
+
+ List<Object[]> rows = shell.executeStatement(
+ "SELECT c.first_name, o.order_id " +
+ "FROM default.orders o JOIN default.customers c ON o.customer_id = c.customer_id " +
+ "ORDER BY o.order_id DESC"
+ );
+
+ Assert.assertEquals(3, rows.size());
+ Assert.assertArrayEquals(new Object[] {"Bob", 102L}, rows.get(0));
+ Assert.assertArrayEquals(new Object[] {"Alice", 101L}, rows.get(1));
+ Assert.assertArrayEquals(new Object[] {"Alice", 100L}, rows.get(2));
+ }
+
+ @Test
+ public void testCBOWithSelfJoin() throws IOException {
+ shell.setHiveSessionValue("hive.cbo.enable", true);
+
testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
List<Object[]> rows = shell.executeStatement(
- "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
- "FROM default.customers c JOIN default.orders o ON c.customer_id = o.customer_id " +
- "ORDER BY c.customer_id, o.order_id"
+ "SELECT o1.order_id, o1.customer_id, o1.total " +
+ "FROM default.orders o1 JOIN default.orders o2 ON o1.order_id = o2.order_id ORDER BY o1.order_id"
);
- Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d}, rows.get(0));
- Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d}, rows.get(1));
- Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, rows.get(2));
+ Assert.assertEquals(3, rows.size());
+ Assert.assertArrayEquals(new Object[] {100L, 0L, 11.11d}, rows.get(0));
+ Assert.assertArrayEquals(new Object[] {101L, 0L, 22.22d}, rows.get(1));
+ Assert.assertArrayEquals(new Object[] {102L, 1L, 33.33d}, rows.get(2));
}
@Test
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index 3329cab..e474b65 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -73,6 +73,10 @@ public class TestHiveShell {
}
}
+ public void setHiveSessionValue(String key, boolean value) {
+ setHiveSessionValue(key, Boolean.toString(value));
+ }
+
public void start() {
metastore.start();
hs2Conf.setVar(HiveConf.ConfVars.METASTOREURIS, metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS));