You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2018/07/25 19:28:18 UTC

[07/10] impala git commit: IMPALA-7257. Support Kudu tables in LocalCatalog

http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index a702206..677805e 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -20,10 +20,10 @@ package org.apache.impala.planner;
 import java.util.List;
 
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.thrift.TSinkAction;
 
 import com.google.common.base.Preconditions;
@@ -116,7 +116,7 @@ public abstract class TableSink extends DataSink {
       Preconditions.checkState(sortColumns.isEmpty());
       // Create the HBaseTableSink and return it.
       return new HBaseTableSink(table);
-    } else if (table instanceof KuduTable) {
+    } else if (table instanceof FeKuduTable) {
       // Kudu doesn't have a way to perform INSERT OVERWRITE.
       Preconditions.checkState(overwrite == false);
       // Sort columns are not supported for Kudu tables.

http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index ac65171..1ff9d20 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -81,11 +81,11 @@ import org.apache.impala.catalog.FeDataSource;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.ImpaladCatalog;
-import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
@@ -330,7 +330,7 @@ public class Frontend {
           new TColumn("name", Type.STRING.toThrift()),
           new TColumn("type", Type.STRING.toThrift()),
           new TColumn("comment", Type.STRING.toThrift()));
-      if (descStmt.getTable() instanceof KuduTable
+      if (descStmt.getTable() instanceof FeKuduTable
           && descStmt.getOutputStyle() == TDescribeOutputStyle.MINIMAL) {
         columns.add(new TColumn("primary_key", Type.STRING.toThrift()));
         columns.add(new TColumn("nullable", Type.STRING.toThrift()));
@@ -750,11 +750,11 @@ public class Frontend {
       return ((HBaseTable) table).getTableStats();
     } else if (table instanceof FeDataSourceTable) {
       return ((FeDataSourceTable) table).getTableStats();
-    } else if (table instanceof KuduTable) {
+    } else if (table instanceof FeKuduTable) {
       if (op == TShowStatsOp.RANGE_PARTITIONS) {
-        return ((KuduTable) table).getRangePartitions();
+        return FeKuduTable.Utils.getRangePartitions((FeKuduTable) table);
       } else {
-        return ((KuduTable) table).getTableStats();
+        return FeKuduTable.Utils.getTableStats((FeKuduTable) table);
       }
     } else {
       throw new InternalException("Invalid table class: " + table.getClass());
@@ -838,7 +838,7 @@ public class Frontend {
       filteredColumns = table.getColumnsInHiveOrder();
     }
     if (outputStyle == TDescribeOutputStyle.MINIMAL) {
-      if (!(table instanceof KuduTable)) {
+      if (!(table instanceof FeKuduTable)) {
         return DescribeResultFactory.buildDescribeMinimalResult(
             Column.columnsToStruct(filteredColumns));
       }

http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 0ce0cf9..a5b575d 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
@@ -362,7 +363,8 @@ public class KuduCatalogOpExecutor {
 
   private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
       TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException {
-    List<String> rangePartitioningColNames = tbl.getRangePartitioningColNames();
+    List<String> rangePartitioningColNames =
+        FeKuduTable.Utils.getRangePartitioningColNames(tbl);
     List<String> rangePartitioningKuduColNames =
       Lists.newArrayListWithCapacity(rangePartitioningColNames.size());
     for (String colName : rangePartitioningColNames) {

http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 07378a9..0293de3 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -31,7 +31,7 @@ import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.LiteralExpr;
-import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -450,9 +450,9 @@ public class KuduUtil {
    */
   public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer)
       throws AnalysisException {
-    Preconditions.checkState(insertStmt.getTargetTable() instanceof KuduTable);
+    Preconditions.checkState(insertStmt.getTargetTable() instanceof FeKuduTable);
     Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
-        (KuduTable) insertStmt.getTargetTable(),
+        (FeKuduTable) insertStmt.getTargetTable(),
         Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
         insertStmt.getPartitionColPos());
     kuduPartitionExpr.analyze(analyzer);

http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index d85b8e3..30dfc5b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.catalog.CatalogTest;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -37,9 +38,12 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.PatternMatcher;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -50,7 +54,7 @@ public class LocalCatalogTest {
   @Before
   public void setupCatalog() {
     FeSupport.loadLibrary();
-    catalog_ = new LocalCatalog(new DirectMetaProvider());
+    catalog_ = LocalCatalog.create(/*defaultKuduMasterHosts=*/null);
   }
 
   @Test
@@ -176,4 +180,35 @@ public class LocalCatalogTest {
     assertEquals(TCatalogObjectType.VIEW, v.getCatalogObjectType());
     assertEquals("SELECT * FROM functional.alltypes", v.getQueryStmt().toSql());
   }
+
+  @Test
+  public void testKuduTable() throws Exception {
+    LocalKuduTable t = (LocalKuduTable) catalog_.getTable("functional_kudu",  "alltypes");
+    assertEquals("id,bool_col,tinyint_col,smallint_col,int_col," +
+        "bigint_col,float_col,double_col,date_string_col,string_col," +
+        "timestamp_col,year,month", Joiner.on(",").join(t.getColumnNames()));
+    // Assert on the generated SQL for the table, but not the table properties, since
+    // those might change based on whether this test runs before or after other
+    // tests which compute stats, etc.
+    Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
+        "CREATE TABLE functional_kudu.alltypes (\n" +
+        "  id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  bool_col BOOLEAN NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  tinyint_col TINYINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  smallint_col SMALLINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  int_col INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  bigint_col BIGINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  float_col FLOAT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  double_col DOUBLE NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  date_string_col STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  string_col STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  timestamp_col TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  year INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  month INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+        "  PRIMARY KEY (id)\n" +
+        ")\n" +
+        "PARTITION BY HASH (id) PARTITIONS 3\n" +
+        "STORED AS KUDU\n" +
+        "TBLPROPERTIES"));
+  }
 }