You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2018/07/25 19:28:18 UTC
[07/10] impala git commit: IMPALA-7257. Support Kudu tables in
LocalCatalog
http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index a702206..677805e 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -20,10 +20,10 @@ package org.apache.impala.planner;
import java.util.List;
import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.KuduTable;
import org.apache.impala.thrift.TSinkAction;
import com.google.common.base.Preconditions;
@@ -116,7 +116,7 @@ public abstract class TableSink extends DataSink {
Preconditions.checkState(sortColumns.isEmpty());
// Create the HBaseTableSink and return it.
return new HBaseTableSink(table);
- } else if (table instanceof KuduTable) {
+ } else if (table instanceof FeKuduTable) {
// Kudu doesn't have a way to perform INSERT OVERWRITE.
Preconditions.checkState(overwrite == false);
// Sort columns are not supported for Kudu tables.
http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index ac65171..1ff9d20 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -81,11 +81,11 @@ import org.apache.impala.catalog.FeDataSource;
import org.apache.impala.catalog.FeDataSourceTable;
import org.apache.impala.catalog.FeDb;
import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.ImpaladCatalog;
-import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
@@ -330,7 +330,7 @@ public class Frontend {
new TColumn("name", Type.STRING.toThrift()),
new TColumn("type", Type.STRING.toThrift()),
new TColumn("comment", Type.STRING.toThrift()));
- if (descStmt.getTable() instanceof KuduTable
+ if (descStmt.getTable() instanceof FeKuduTable
&& descStmt.getOutputStyle() == TDescribeOutputStyle.MINIMAL) {
columns.add(new TColumn("primary_key", Type.STRING.toThrift()));
columns.add(new TColumn("nullable", Type.STRING.toThrift()));
@@ -750,11 +750,11 @@ public class Frontend {
return ((HBaseTable) table).getTableStats();
} else if (table instanceof FeDataSourceTable) {
return ((FeDataSourceTable) table).getTableStats();
- } else if (table instanceof KuduTable) {
+ } else if (table instanceof FeKuduTable) {
if (op == TShowStatsOp.RANGE_PARTITIONS) {
- return ((KuduTable) table).getRangePartitions();
+ return FeKuduTable.Utils.getRangePartitions((FeKuduTable) table);
} else {
- return ((KuduTable) table).getTableStats();
+ return FeKuduTable.Utils.getTableStats((FeKuduTable) table);
}
} else {
throw new InternalException("Invalid table class: " + table.getClass());
@@ -838,7 +838,7 @@ public class Frontend {
filteredColumns = table.getColumnsInHiveOrder();
}
if (outputStyle == TDescribeOutputStyle.MINIMAL) {
- if (!(table instanceof KuduTable)) {
+ if (!(table instanceof FeKuduTable)) {
return DescribeResultFactory.buildDescribeMinimalResult(
Column.columnsToStruct(filteredColumns));
}
http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 0ce0cf9..a5b575d 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.Table;
@@ -362,7 +363,8 @@ public class KuduCatalogOpExecutor {
private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException {
- List<String> rangePartitioningColNames = tbl.getRangePartitioningColNames();
+ List<String> rangePartitioningColNames =
+ FeKuduTable.Utils.getRangePartitioningColNames(tbl);
List<String> rangePartitioningKuduColNames =
Lists.newArrayListWithCapacity(rangePartitioningColNames.size());
for (String colName : rangePartitioningColNames) {
http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 07378a9..0293de3 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -31,7 +31,7 @@ import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.KuduPartitionExpr;
import org.apache.impala.analysis.LiteralExpr;
-import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
@@ -450,9 +450,9 @@ public class KuduUtil {
*/
public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer)
throws AnalysisException {
- Preconditions.checkState(insertStmt.getTargetTable() instanceof KuduTable);
+ Preconditions.checkState(insertStmt.getTargetTable() instanceof FeKuduTable);
Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
- (KuduTable) insertStmt.getTargetTable(),
+ (FeKuduTable) insertStmt.getTargetTable(),
Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
insertStmt.getPartitionColPos());
kuduPartitionExpr.analyze(analyzer);
http://git-wip-us.apache.org/repos/asf/impala/blob/c333b552/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index d85b8e3..30dfc5b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.util.List;
import java.util.Set;
+import org.apache.impala.analysis.ToSqlUtils;
import org.apache.impala.catalog.CatalogTest;
import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.FeCatalogUtils;
@@ -37,9 +38,12 @@ import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.PatternMatcher;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -50,7 +54,7 @@ public class LocalCatalogTest {
@Before
public void setupCatalog() {
FeSupport.loadLibrary();
- catalog_ = new LocalCatalog(new DirectMetaProvider());
+ catalog_ = LocalCatalog.create(/*defaultKuduMasterHosts=*/null);
}
@Test
@@ -176,4 +180,35 @@ public class LocalCatalogTest {
assertEquals(TCatalogObjectType.VIEW, v.getCatalogObjectType());
assertEquals("SELECT * FROM functional.alltypes", v.getQueryStmt().toSql());
}
+
+ @Test
+ public void testKuduTable() throws Exception {
+ LocalKuduTable t = (LocalKuduTable) catalog_.getTable("functional_kudu", "alltypes");
+ assertEquals("id,bool_col,tinyint_col,smallint_col,int_col," +
+ "bigint_col,float_col,double_col,date_string_col,string_col," +
+ "timestamp_col,year,month", Joiner.on(",").join(t.getColumnNames()));
+ // Assert on the generated SQL for the table, but not the table properties, since
+ // those might change based on whether this test runs before or after other
+ // tests which compute stats, etc.
+ Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
+ "CREATE TABLE functional_kudu.alltypes (\n" +
+ " id INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " bool_col BOOLEAN NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " tinyint_col TINYINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " smallint_col SMALLINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " int_col INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " bigint_col BIGINT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " float_col FLOAT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " double_col DOUBLE NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " date_string_col STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " string_col STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " timestamp_col TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " year INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " month INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,\n" +
+ " PRIMARY KEY (id)\n" +
+ ")\n" +
+ "PARTITION BY HASH (id) PARTITIONS 3\n" +
+ "STORED AS KUDU\n" +
+ "TBLPROPERTIES"));
+ }
}