You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/15 17:27:34 UTC

[iceberg] branch master updated: Hive: Fix join issues when CBO is enabled (#2052)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d50f540  Hive: Fix join issues when CBO is enabled (#2052)
d50f540 is described below

commit d50f54051e72d20d56aac3e725351dc6919199e9
Author: qphien <cl...@126.com>
AuthorDate: Sat Jan 16 01:27:25 2021 +0800

    Hive: Fix join issues when CBO is enabled (#2052)
    
    Co-authored-by: 罗冲 <lu...@corp.netease.com>
---
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   | 17 +++--
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 77 ++++++++++++++++++----
 .../org/apache/iceberg/mr/hive/TestHiveShell.java  |  4 ++
 3 files changed, 82 insertions(+), 16 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 994d79b..452b1e5 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.mr.hive;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -66,9 +67,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     assertNotVectorizedTez(configuration);
 
     Schema tableSchema;
-    if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) {
-      tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA));
-    } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
+    if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
       tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
     } else {
       try {
@@ -82,7 +81,17 @@ public class HiveIcebergSerDe extends AbstractSerDe {
     }
 
     String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
-    Schema projectedSchema = selectedColumns.length > 0 ? tableSchema.select(selectedColumns) : tableSchema;
+    // When same table is joined multiple times, it is possible some selected columns are duplicated,
+    // in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
+    String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
+    Schema projectedSchema = distinctSelectedColumns.length > 0 ?
+            tableSchema.select(distinctSelectedColumns) : tableSchema;
+    // the input split mapper handles does not belong to this table
+    // it is necessary to ensure projectedSchema equals to tableSchema,
+    // or we cannot find selectOperator's column from inspector
+    if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
+      projectedSchema = tableSchema;
+    }
 
     try {
       this.inspector = IcebergObjectInspector.create(projectedSchema);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c545a19..7df3418 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -42,6 +42,7 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
@@ -54,12 +55,26 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   private static final Schema ORDER_SCHEMA = new Schema(
           required(1, "order_id", Types.LongType.get()),
           required(2, "customer_id", Types.LongType.get()),
-          required(3, "total", Types.DoubleType.get()));
+          required(3, "total", Types.DoubleType.get()),
+          required(4, "product_id", Types.LongType.get())
+  );
 
   private static final List<Record> ORDER_RECORDS = TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA)
-          .add(100L, 0L, 11.11d)
-          .add(101L, 0L, 22.22d)
-          .add(102L, 1L, 33.33d)
+          .add(100L, 0L, 11.11d, 1L)
+          .add(101L, 0L, 22.22d, 2L)
+          .add(102L, 1L, 33.33d, 3L)
+          .build();
+
+  private static final Schema PRODUCT_SCHEMA = new Schema(
+          optional(1, "id", Types.LongType.get()),
+          optional(2, "name", Types.StringType.get()),
+          optional(3, "price", Types.DoubleType.get())
+  );
+
+  private static final List<Record> PRODUCT_RECORDS = TestHelper.RecordsBuilder.newInstance(PRODUCT_SCHEMA)
+          .add(1L, "skirt", 11.11d)
+          .add(2L, "tee", 22.22d)
+          .add(3L, "watch", 33.33d)
           .build();
 
   private static final List<Type> SUPPORTED_TYPES =
@@ -151,20 +166,58 @@ public class TestHiveIcebergStorageHandlerWithEngine {
   }
 
   @Test
-  public void testJoinTables() throws IOException {
+  public void testCBOWithSelectedColumnsNonOverlapJoin() throws IOException {
+    shell.setHiveSessionValue("hive.cbo.enable", true);
+
+    testTables.createTable(shell, "products", PRODUCT_SCHEMA, fileFormat, PRODUCT_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT o.order_id, o.customer_id, o.total, p.name " +
+                    "FROM default.orders o JOIN default.products p ON o.product_id = p.id ORDER BY o.order_id"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {100L, 0L, 11.11d, "skirt"}, rows.get(0));
+    Assert.assertArrayEquals(new Object[] {101L, 0L, 22.22d, "tee"}, rows.get(1));
+    Assert.assertArrayEquals(new Object[] {102L, 1L, 33.33d, "watch"}, rows.get(2));
+  }
+
+  @Test
+  public void testCBOWithSelectedColumnsOverlapJoin() throws IOException {
+    shell.setHiveSessionValue("hive.cbo.enable", true);
+
     testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
-        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+            HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT c.first_name, o.order_id " +
+                    "FROM default.orders o JOIN default.customers c ON o.customer_id = c.customer_id " +
+                    "ORDER BY o.order_id DESC"
+    );
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {"Bob", 102L}, rows.get(0));
+    Assert.assertArrayEquals(new Object[] {"Alice", 101L}, rows.get(1));
+    Assert.assertArrayEquals(new Object[] {"Alice", 100L}, rows.get(2));
+  }
+
+  @Test
+  public void testCBOWithSelfJoin() throws IOException {
+    shell.setHiveSessionValue("hive.cbo.enable", true);
+
     testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS);
 
     List<Object[]> rows = shell.executeStatement(
-            "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
-                    "FROM default.customers c JOIN default.orders o ON c.customer_id = o.customer_id " +
-                    "ORDER BY c.customer_id, o.order_id"
+            "SELECT o1.order_id, o1.customer_id, o1.total " +
+                    "FROM default.orders o1 JOIN default.orders o2 ON o1.order_id = o2.order_id ORDER BY o1.order_id"
     );
 
-    Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d}, rows.get(0));
-    Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d}, rows.get(1));
-    Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, rows.get(2));
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {100L, 0L, 11.11d}, rows.get(0));
+    Assert.assertArrayEquals(new Object[] {101L, 0L, 22.22d}, rows.get(1));
+    Assert.assertArrayEquals(new Object[] {102L, 1L, 33.33d}, rows.get(2));
   }
 
   @Test
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index 3329cab..e474b65 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -73,6 +73,10 @@ public class TestHiveShell {
     }
   }
 
+  public void setHiveSessionValue(String key, boolean value) {
+    setHiveSessionValue(key, Boolean.toString(value));
+  }
+
   public void start() {
     metastore.start();
     hs2Conf.setVar(HiveConf.ConfVars.METASTOREURIS, metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS));