You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/07 00:56:09 UTC

[2/5] impala git commit: IMPALA-7258: Support querying HBase tables in LocalCatalog

IMPALA-7258: Support querying HBase tables in LocalCatalog

This is a straightforward port expecting no behavior change. All of
the HBase E2E tests pass with this patch.

Change-Id: I8cc94bc38861443de5a375b7e63d29215e0ca899
Reviewed-on: http://gerrit.cloudera.org:8080/11079
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e6bf4dc2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e6bf4dc2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e6bf4dc2

Branch: refs/heads/master
Commit: e6bf4dc2a1c6205b9ae5d921702340ac0089370e
Parents: a0673c0
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed Jul 18 18:28:04 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Aug 6 22:05:15 2018 +0000

----------------------------------------------------------------------
 .../analysis/AlterTableAddReplaceColsStmt.java  |   4 +-
 .../impala/analysis/AlterTableAlterColStmt.java |   4 +-
 .../impala/analysis/AlterTableDropColStmt.java  |   4 +-
 .../impala/analysis/AlterTableSetStmt.java      |   4 +-
 .../analysis/AlterTableSetTblProperties.java    |   4 +-
 .../impala/analysis/AlterTableSortByStmt.java   |   4 +-
 .../org/apache/impala/analysis/Analyzer.java    |   4 +-
 .../impala/analysis/ComputeStatsStmt.java       |   5 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  15 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |   4 +-
 .../org/apache/impala/catalog/FeHBaseTable.java | 676 ++++++++++++++++
 .../org/apache/impala/catalog/HBaseTable.java   | 762 ++-----------------
 .../impala/catalog/local/LocalHbaseTable.java   | 115 +++
 .../apache/impala/catalog/local/LocalTable.java |   3 +-
 .../apache/impala/planner/HBaseScanNode.java    |  17 +-
 .../java/org/apache/impala/planner/Planner.java |   4 +-
 .../impala/planner/SingleNodePlanner.java       |   4 +-
 .../org/apache/impala/planner/TableSink.java    |   5 +-
 .../org/apache/impala/service/Frontend.java     |   7 +-
 .../impala/catalog/local/LocalCatalogTest.java  |  28 +
 20 files changed, 923 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
index 6abac52..0b110ae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAddReplaceColsParams;
 import org.apache.impala.thrift.TAlterTableParams;
@@ -75,7 +75,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt {
     FeTable t = getTargetTable();
     // TODO: Support column-level DDL on HBase tables. Requires updating the column
     // mappings along with the table columns.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE ADD|REPLACE COLUMNS not currently " +
           "supported on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
index f7d8ce8..71577ce 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
@@ -20,9 +20,9 @@ package org.apache.impala.analysis;
 import java.util.Map;
 
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAlterColParams;
@@ -101,7 +101,7 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
     FeTable t = getTargetTable();
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException(
           "ALTER TABLE CHANGE/ALTER COLUMN not currently supported on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
index 94fbc8d..8b9e875 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
@@ -18,8 +18,8 @@
 package org.apache.impala.analysis;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableDropColParams;
 import org.apache.impala.thrift.TAlterTableParams;
@@ -57,7 +57,7 @@ public class AlterTableDropColStmt extends AlterTableStmt {
     FeTable t = getTargetTable();
     // TODO: Support column-level DDL on HBase tables. Requires updating the column
     // mappings along with the table columns.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE DROP COLUMN not currently supported " +
           "on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
index 948a7aa..4d6a10a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
@@ -18,8 +18,8 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 
 /**
@@ -43,7 +43,7 @@ public class AlterTableSetStmt extends AlterTableStmt {
     // TODO: Support ALTER TABLE SET on HBase tables. Requires validating changes
     // to the SERDEPROPERTIES and TBLPROPERTIES to ensure the table metadata does not
     // become invalid.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE SET not currently supported on " +
           "HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 1f02390..8995772 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
@@ -210,7 +210,7 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
 
     // ALTER TABLE SET is not supported on HBase tables at all, see
     // AlterTableSetStmt::analyze().
-    Preconditions.checkState(!(table instanceof HBaseTable));
+    Preconditions.checkState(!(table instanceof FeHBaseTable));
 
     if (table instanceof FeKuduTable) {
       throw new AnalysisException(String.format("'%s' table property is not supported " +

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
index 0f99c9b..8d45f55 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
@@ -20,9 +20,9 @@ package org.apache.impala.analysis;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
@@ -68,7 +68,7 @@ public class AlterTableSortByStmt extends AlterTableStmt {
 
     // Disallow setting sort columns on HBase and Kudu tables.
     FeTable targetTable = getTargetTable();
-    if (targetTable instanceof HBaseTable) {
+    if (targetTable instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE SORT BY not supported on HBase tables.");
     }
     if (targetTable instanceof FeKuduTable) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index e902383..6468513 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -43,10 +43,10 @@ import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
@@ -589,7 +589,7 @@ public class Analyzer {
       // The table must be a base table.
       Preconditions.checkState(table instanceof FeFsTable ||
           table instanceof FeKuduTable ||
-          table instanceof HBaseTable ||
+          table instanceof FeHBaseTable ||
           table instanceof FeDataSourceTable);
       return new BaseTableRef(tableRef, resolvedPath);
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 9ce7b0d..52c7a15 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -30,8 +30,8 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsTable;
@@ -217,7 +217,8 @@ public class ComputeStatsStmt extends StatementBase {
     // For Hdfs tables, exclude partition columns from stats gathering because Hive
     // cannot store them as part of the non-partition column stats. For HBase tables,
     // include the single clustering column (the row key).
-    int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols();
+    int startColIdx = (table_ instanceof FeHBaseTable) ? 0 :
+        table_.getNumClusteringCols();
 
     for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
       Column c = table_.getColumns().get(i);

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 55da237..9998b0a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -27,10 +27,10 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
@@ -283,7 +283,7 @@ public class InsertStmt extends StatementBase {
     // Also checks if the target table is missing.
     analyzeTargetTable(analyzer);
 
-    boolean isHBaseTable = (table_ instanceof HBaseTable);
+    boolean isHBaseTable = (table_ instanceof FeHBaseTable);
     int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
 
     // Analysis of the INSERT statement from this point is basically the act of matching
@@ -459,7 +459,7 @@ public class InsertStmt extends StatementBase {
    * - Overwrite is invalid for HBase and Kudu tables
    */
   private void analyzeTableForInsert(Analyzer analyzer) throws AnalysisException {
-    boolean isHBaseTable = (table_ instanceof HBaseTable);
+    boolean isHBaseTable = (table_ instanceof FeHBaseTable);
     int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
 
     if (partitionKeyValues_ != null && numClusteringCols == 0) {
@@ -539,7 +539,7 @@ public class InsertStmt extends StatementBase {
       // exists. So all columns aren't mentioned in the query.
       if (table_ instanceof FeKuduTable) {
         checkRequiredKuduColumns(mentionedColumnNames);
-      } else if (table_ instanceof HBaseTable) {
+      } else if (table_ instanceof FeHBaseTable) {
         checkRequiredHBaseColumns(mentionedColumnNames);
       } else if (table_.getNumClusteringCols() > 0) {
         checkRequiredPartitionedColumns(mentionedColumnNames);
@@ -605,7 +605,7 @@ public class InsertStmt extends StatementBase {
    */
   private void checkRequiredHBaseColumns(Set<String> mentionedColumnNames)
       throws AnalysisException {
-    Preconditions.checkState(table_ instanceof HBaseTable);
+    Preconditions.checkState(table_ instanceof FeHBaseTable);
     Column column = table_.getColumns().get(0);
     if (!mentionedColumnNames.contains(column.getName())) {
       throw new AnalysisException("Row-key column '" + column.getName() +
@@ -660,7 +660,8 @@ public class InsertStmt extends StatementBase {
     List<Expr> tmpPartitionKeyExprs = new ArrayList<Expr>();
     List<String> tmpPartitionKeyNames = new ArrayList<String>();
 
-    int numClusteringCols = (tbl instanceof HBaseTable) ? 0 : tbl.getNumClusteringCols();
+    int numClusteringCols = (tbl instanceof FeHBaseTable) ? 0
+        : tbl.getNumClusteringCols();
     boolean isKuduTable = table_ instanceof FeKuduTable;
     Set<String> kuduPartitionColumnNames = null;
     if (isKuduTable) {
@@ -809,7 +810,7 @@ public class InsertStmt extends StatementBase {
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
     if (planHints_.isEmpty()) return;
-    if (table_ instanceof HBaseTable) {
+    if (table_ instanceof FeHBaseTable) {
       throw new AnalysisException(String.format("INSERT hints are only supported for " +
           "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index dca6b94..6e5ad6a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -42,11 +42,11 @@ import org.apache.hadoop.hive.ql.parse.HiveLexer;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.Function;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.KuduColumn;
@@ -240,7 +240,7 @@ public class ToSqlUtils {
     removeHiddenTableProperties(properties, Maps.<String, String>newHashMap());
     ArrayList<String> colsSql = Lists.newArrayList();
     ArrayList<String> partitionColsSql = Lists.newArrayList();
-    boolean isHbaseTable = table instanceof HBaseTable;
+    boolean isHbaseTable = table instanceof FeHBaseTable;
     for (int i = 0; i < table.getColumns().size(); i++) {
       if (!isHbaseTable && i < table.getNumClusteringCols()) {
         partitionColsSql.add(columnToSql(table.getColumns().get(i)));

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
new file mode 100644
index 0000000..2b26dd5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
@@ -0,0 +1,676 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.THBaseTable;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.util.StatsHelper;
+import org.apache.impala.util.TResultRowBuilder;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface FeHBaseTable extends FeTable {
+  /**
+   * @see Util#getEstimatedRowStats(byte[], byte[])
+   */
+  Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey, byte[] endRowKey);
+
+  /**
+   * @see Util#getHBaseTableName(Table)
+   */
+  String getHBaseTableName();
+
+  /**
+   * @see Util#getTableStats(FeHBaseTable)
+   */
+  TResultSet getTableStats();
+
+  /**
+   * Implementations may want to cache column families. This getter is for static
+   * functions in {@link Util} to access those potentially cached data.
+   */
+  HColumnDescriptor[] getColumnFamilies() throws IOException;
+
+  /**
+   * Utility functions for acting on FeHBaseTable.
+   * When we fully move to Java 8, these can become default methods of the interface.
+   */
+  abstract class Util {
+    // Storage handler class for HBase tables read by Hive.
+    public static final String HBASE_STORAGE_HANDLER =
+        "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
+    // Column family of HBase row key
+    static final String ROW_KEY_COLUMN_FAMILY = ":key";
+    // Copied from Hive's HBaseStorageHandler.java.
+    static final String DEFAULT_PREFIX = "default.";
+    // Number of rows fetched during the row count estimation per region
+    static final int ROW_COUNT_ESTIMATE_BATCH_SIZE = 10;
+    // Keep the conf around
+    static final Configuration HBASE_CONF = HBaseConfiguration.create();
+    private static final Logger LOG = Logger.getLogger(FeHBaseTable.class);
+    // Maximum deviation from the average to stop querying more regions
+    // to estimate the row count
+    private static final double DELTA_FROM_AVERAGE = 0.15;
+    // Minimum number of regions that are checked to estimate the row count
+    private static final int MIN_NUM_REGIONS_TO_CHECK = 5;
+
+    /**
+     * Table client objects are thread-unsafe and cheap to create. The HBase docs
+     * recommend creating a new one for each task and then closing when done.
+     */
+    public static org.apache.hadoop.hbase.client.Table getHBaseTable(
+        String hbaseTableName) throws IOException {
+      return ConnectionHolder.getConnection().getTable(TableName.valueOf(hbaseTableName));
+    }
+
+    static org.apache.hadoop.hbase.client.Table getHBaseTable(FeHBaseTable tbl)
+        throws IOException {
+      return getHBaseTable(tbl.getHBaseTableName());
+    }
+
+    /**
+     * Load columns from msTable in hive order. No IO is involved.
+     */
+    public static List<Column> loadColumns(
+        org.apache.hadoop.hive.metastore.api.Table msTable)
+        throws MetaException, SerDeException {
+      Map<String, String> serdeParams = msTable.getSd().getSerdeInfo().getParameters();
+      String hbaseColumnsMapping = serdeParams.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+      if (hbaseColumnsMapping == null) {
+        throw new MetaException("No hbase.columns.mapping defined in Serde.");
+      }
+
+      String hbaseTableDefaultStorageType =
+          msTable.getParameters().get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+      boolean tableDefaultStorageIsBinary = false;
+      if (hbaseTableDefaultStorageType != null &&
+          !hbaseTableDefaultStorageType.isEmpty()) {
+        if (hbaseTableDefaultStorageType.equalsIgnoreCase("binary")) {
+          tableDefaultStorageIsBinary = true;
+        } else if (!hbaseTableDefaultStorageType.equalsIgnoreCase("string")) {
+          throw new SerDeException(
+              "Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+                  " parameter must be specified as" + " 'string' or 'binary'; '" +
+                  hbaseTableDefaultStorageType +
+                  "' is not a valid specification for this table/serde property.");
+        }
+      }
+
+      // Parse HBase column-mapping string.
+      List<FieldSchema> fieldSchemas = msTable.getSd().getCols();
+      List<String> hbaseColumnFamilies = new ArrayList<>();
+      List<String> hbaseColumnQualifiers = new ArrayList<>();
+      List<Boolean> hbaseColumnBinaryEncodings = new ArrayList<>();
+      parseColumnMapping(tableDefaultStorageIsBinary, hbaseColumnsMapping,
+          msTable.getTableName(), fieldSchemas, hbaseColumnFamilies,
+          hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
+      Preconditions
+          .checkState(hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
+      Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
+
+      // Populate tmp cols in the order they appear in the Hive metastore.
+      // We will reorder the cols below.
+      List<HBaseColumn> tmpCols = Lists.newArrayList();
+      // Store the key column separately.
+      // TODO: Change this to an ArrayList once we support composite row keys.
+      HBaseColumn keyCol = null;
+      for (int i = 0; i < fieldSchemas.size(); ++i) {
+        FieldSchema s = fieldSchemas.get(i);
+        Type t = Type.INVALID;
+        try {
+          t = FeCatalogUtils.parseColumnType(s, msTable.getTableName());
+        } catch (TableLoadingException e) {
+          // Ignore hbase types we don't support yet. We can load the metadata
+          // but won't be able to select from it.
+        }
+        HBaseColumn col = new HBaseColumn(s.getName(), hbaseColumnFamilies.get(i),
+            hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i), t,
+            s.getComment(), -1);
+        if (col.getColumnFamily().equals(ROW_KEY_COLUMN_FAMILY)) {
+          // Store the row key column separately from the rest
+          keyCol = col;
+        } else {
+          tmpCols.add(col);
+        }
+      }
+      Preconditions.checkState(keyCol != null);
+      // The backend assumes that the row key column is always first and
+      // that the remaining HBase columns are ordered by columnFamily,columnQualifier,
+      // so the final position depends on the other mapped HBase columns.
+      // Sort columns and update positions.
+      Collections.sort(tmpCols);
+      keyCol.setPosition(0);
+      List<Column> cols = new ArrayList<>();
+      cols.add(keyCol);
+      // Update the positions of the remaining columns
+      for (int i = 0; i < tmpCols.size(); ++i) {
+        HBaseColumn col = tmpCols.get(i);
+        col.setPosition(i + 1);
+        cols.add(col);
+      }
+      return cols;
+    }
+
+    /**
+     * Parse the column description string to the column families and column
+     * qualifies. This is a copy of HBaseSerDe.parseColumnMapping and
+     * parseColumnStorageTypes with parts we don't use removed. The hive functions
+     * are not public.
+     * tableDefaultStorageIsBinary - true if table is default to binary encoding
+     * columnsMappingSpec - input string format describing the table
+     * fieldSchemas - input field schema from metastore table
+     * columnFamilies/columnQualifiers/columnBinaryEncodings - out parameters that will be
+     * filled with the column family, column qualifier and encoding for each column.
+     */
+    static void parseColumnMapping(boolean tableDefaultStorageIsBinary,
+        String columnsMappingSpec, String tblName, List<FieldSchema> fieldSchemas,
+        List<String> columnFamilies, List<String> columnQualifiers,
+        List<Boolean> colIsBinaryEncoded) throws SerDeException {
+      if (columnsMappingSpec == null) {
+        throw new SerDeException(
+            "Error: hbase.columns.mapping missing for this HBase table.");
+      }
+
+      if (columnsMappingSpec.equals("") ||
+          columnsMappingSpec.equals(HBaseSerDe.HBASE_KEY_COL)) {
+        throw new SerDeException("Error: hbase.columns.mapping specifies only " +
+            "the HBase table row key. A valid Hive-HBase table must specify at " +
+            "least one additional column.");
+      }
+
+      int rowKeyIndex = -1;
+      String[] columnSpecs = columnsMappingSpec.split(",");
+      // If there was an implicit key column mapping, the number of columns (fieldSchemas)
+      // will be one more than the number of column mapping specs.
+      int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
+      if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
+        // This should never happen - Hive blocks creating a mismatched table and both
+        // Hive and Impala currently block all column-level DDL on HBase tables.
+        throw new SerDeException(String.format("Number of entries in " +
+                "'hbase.columns.mapping' does not match the number of columns in the " +
+                "table: %d != %d (counting the key if implicit)", columnSpecs.length,
+            fieldSchemas.size()));
+      }
+
+      for (int i = 0; i < columnSpecs.length; ++i) {
+        String mappingSpec = columnSpecs[i];
+        String[] mapInfo = mappingSpec.split("#");
+        // Trim column info so that serdeproperties with new lines still parse correctly.
+        String colInfo = mapInfo[0].trim();
+
+        int idxFirst = colInfo.indexOf(":");
+        int idxLast = colInfo.lastIndexOf(":");
+
+        if (idxFirst < 0 || !(idxFirst == idxLast)) {
+          throw new SerDeException("Error: the HBase columns mapping contains a " +
+              "badly formed column family, column qualifier specification.");
+        }
+
+        if (colInfo.equals(HBaseSerDe.HBASE_KEY_COL)) {
+          Preconditions.checkState(fsStartIdxOffset == 0);
+          rowKeyIndex = i;
+          columnFamilies.add(colInfo);
+          columnQualifiers.add(null);
+        } else {
+          String[] parts = colInfo.split(":");
+          Preconditions.checkState(parts.length > 0 && parts.length <= 2);
+          columnFamilies.add(parts[0]);
+          if (parts.length == 2) {
+            columnQualifiers.add(parts[1]);
+          } else {
+            columnQualifiers.add(null);
+          }
+        }
+
+        // Set column binary encoding
+        FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
+        boolean supportsBinaryEncoding = supportsBinaryEncoding(fieldSchema, tblName);
+        if (mapInfo.length == 1) {
+          // There is no column level storage specification. Use the table storage spec.
+          colIsBinaryEncoded.add(tableDefaultStorageIsBinary && supportsBinaryEncoding);
+        } else if (mapInfo.length == 2) {
+          // There is a storage specification for the column
+          String storageOption = mapInfo[1];
+
+          if (!(storageOption.equals("-") || "string".startsWith(storageOption) ||
+              "binary".startsWith(storageOption))) {
+            throw new SerDeException("Error: A column storage specification is one of" +
+                " the following: '-', a prefix of 'string', or a prefix of 'binary'. " +
+                storageOption + " is not a valid storage option specification for " +
+                fieldSchema.getName());
+          }
+
+          boolean isBinaryEncoded = false;
+          if ("-".equals(storageOption)) {
+            isBinaryEncoded = tableDefaultStorageIsBinary;
+          } else if ("binary".startsWith(storageOption)) {
+            isBinaryEncoded = true;
+          }
+          if (isBinaryEncoded && !supportsBinaryEncoding) {
+            // Use string encoding and log a warning if the column spec is binary but the
+            // column type does not support it.
+            // TODO: Hive/HBase does not raise an exception, but should we?
+            LOG.warn("Column storage specification for column " + fieldSchema.getName() +
+                " is binary" + " but the column type " + fieldSchema.getType() +
+                " does not support binary encoding. Fallback to string format.");
+            isBinaryEncoded = false;
+          }
+          colIsBinaryEncoded.add(isBinaryEncoded);
+        } else {
+          // error in storage specification
+          throw new SerDeException(
+              "Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification " +
+                  mappingSpec + " is not valid for column: " + fieldSchema.getName());
+        }
+      }
+
+      if (rowKeyIndex == -1) {
+        columnFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
+        columnQualifiers.add(0, null);
+        colIsBinaryEncoded.add(0, supportsBinaryEncoding(fieldSchemas.get(0), tblName) &&
+            tableDefaultStorageIsBinary);
+      }
+    }
+
+
+    /**
+     * Get an estimate of the number of rows and bytes per row in regions between
+     * startRowKey and endRowKey.
+     * <p>
+     * This number is calculated by incrementally checking as many region servers as
+     * necessary until we observe a relatively constant row size per region on average.
+     * Depending on the skew of data in the regions this can either mean that we need
+     * to check only a minimal number of regions or that we will scan all regions.
+     * <p>
+     * The HBase region servers periodically update the master with their metrics,
+     * including storefile size. We get the size of the storefiles for all regions in
+     * the cluster with a single call to getClusterStatus from the master.
+     * <p>
+     * The accuracy of this number is determined by the number of rows that are written
+     * and kept in the memstore and have not been flushed until now. A large number
+     * of key-value pairs in the memstore will lead to bad estimates as this number
+     * is not reflected in the storefile size that is used to estimate this number.
+     * <p>
+     * Currently, the algorithm does not consider the case that the key range used as a
+     * parameter might be generally of different size than the rest of the region.
+     * <p>
+     * The values computed here should be cached so that in high qps workloads
+     * the nn is not overwhelmed. Could be done in load(); Synchronized to make
+     * sure that only one thread at a time is using the htable.
+     *
+     * @param startRowKey First row key in the range
+     * @param endRowKey   Last row key in the range
+     * @return The estimated number of rows in the regions between the row keys (first)
+     * and the estimated row size in bytes (second).
+     */
+    public static Pair<Long, Long> getEstimatedRowStats(FeHBaseTable tbl,
+        byte[] startRowKey, byte[] endRowKey) {
+      Preconditions.checkNotNull(startRowKey);
+      Preconditions.checkNotNull(endRowKey);
+
+      boolean isCompressed = false;
+      long rowCount;
+      long rowSize;
+      try {
+        ClusterStatus clusterStatus = getClusterStatus();
+        // Check to see if things are compressed.
+        // If they are we'll estimate a compression factor.
+        HColumnDescriptor[] columnFamilies = tbl.getColumnFamilies();
+        Preconditions.checkNotNull(columnFamilies);
+        for (HColumnDescriptor desc : columnFamilies) {
+          isCompressed |= desc.getCompression() != Compression.Algorithm.NONE;
+        }
+
+        // Fetch all regions for the key range
+        List<HRegionLocation> locations = getRegionsInRange(tbl, startRowKey, endRowKey);
+        Collections.shuffle(locations);
+        // The following variables track the number and size of 'rows' in
+        // HBase and allow incremental calculation of the average and standard
+        // deviation.
+        StatsHelper<Long> statsSize = new StatsHelper<>();
+        long totalEstimatedRows = 0;
+
+        // Collects stats samples from at least MIN_NUM_REGIONS_TO_CHECK
+        // and at most all regions until the delta is small enough.
+        while ((statsSize.count() < MIN_NUM_REGIONS_TO_CHECK ||
+            statsSize.stddev() > statsSize.mean() * DELTA_FROM_AVERAGE) &&
+            statsSize.count() < locations.size()) {
+          HRegionLocation currentLocation = locations.get((int) statsSize.count());
+          Pair<Long, Long> tmp =
+              getEstimatedRowStatsForRegion(tbl, currentLocation, isCompressed,
+                  clusterStatus);
+          totalEstimatedRows += tmp.first;
+          statsSize.addSample(tmp.second);
+        }
+
+        // Sum up the total size for all regions in range.
+        long totalSize = 0;
+        for (final HRegionLocation location : locations) {
+          totalSize += getRegionSize(location, clusterStatus);
+        }
+        if (totalSize == 0) {
+          rowCount = totalEstimatedRows;
+        } else {
+          rowCount = (long) (totalSize / statsSize.mean());
+        }
+        rowSize = (long) statsSize.mean();
+        return new Pair<>(rowCount, rowSize);
+      } catch (IOException ioe) {
+        // Print the stack trace, but we'll ignore it
+        // as this is just an estimate.
+        // TODO: Put this into the per query log.
+        LOG.error("Error computing HBase row count estimate", ioe);
+        return new Pair<>(-1L, -1L);
+      }
+    }
+
+    /**
+     * Returns statistics on this table as a tabular result set. Used for the
+     * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
+     * inside this method.
+     */
+    public static TResultSet getTableStats(FeHBaseTable tbl) {
+      TResultSet result = new TResultSet();
+      TResultSetMetadata resultSchema = new TResultSetMetadata();
+      result.setSchema(resultSchema);
+      resultSchema.addToColumns(new TColumn("Region Location", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Start RowKey", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Est. #Rows", Type.BIGINT.toThrift()));
+      resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
+
+      // TODO: Consider fancier stats maintenance techniques for speeding up this process.
+      // Currently, we list all regions and perform a mini-scan of each of them to
+      // estimate the number of rows, the data size, etc., which is rather expensive.
+      try {
+        ClusterStatus clusterStatus = getClusterStatus();
+        long totalNumRows = 0;
+        long totalSize = 0;
+        List<HRegionLocation> regions =
+            getRegionsInRange(tbl, HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
+        for (HRegionLocation region : regions) {
+          TResultRowBuilder rowBuilder = new TResultRowBuilder();
+          HRegionInfo regionInfo = region.getRegionInfo();
+          Pair<Long, Long> estRowStats =
+              getEstimatedRowStatsForRegion(tbl, region, false, clusterStatus);
+
+          long numRows = estRowStats.first;
+          long regionSize = getRegionSize(region, clusterStatus);
+          totalNumRows += numRows;
+          totalSize += regionSize;
+
+          // Add the region location, start rowkey, number of rows and raw size.
+          rowBuilder.add(String.valueOf(region.getHostname()))
+              .add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
+              .addBytes(regionSize);
+          result.addToRows(rowBuilder.get());
+        }
+
+        // Total num rows and raw region size.
+        if (regions.size() > 1) {
+          TResultRowBuilder rowBuilder = new TResultRowBuilder();
+          rowBuilder.add("Total").add("").add(totalNumRows).addBytes(totalSize);
+          result.addToRows(rowBuilder.get());
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return result;
+    }
+
+
+    /**
+     * This method is completely copied from Hive's HBaseStorageHandler.java.
+     */
+    public static String getHBaseTableName(
+        org.apache.hadoop.hive.metastore.api.Table tbl) {
+      // Give preference to TBLPROPERTIES over SERDEPROPERTIES
+      // (really we should only use TBLPROPERTIES, so this is just
+      // for backwards compatibility with the original specs).
+      String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+      if (tableName == null) {
+        tableName =
+            tbl.getSd().getSerdeInfo().getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+      }
+      if (tableName == null) {
+        tableName = tbl.getDbName() + "." + tbl.getTableName();
+        if (tableName.startsWith(DEFAULT_PREFIX)) {
+          tableName = tableName.substring(DEFAULT_PREFIX.length());
+        }
+      }
+      return tableName;
+    }
+
+    /**
+     * Estimates the number of rows for a single region and returns a pair with
+     * the estimated row count and the estimated size in bytes per row.
+     */
+    private static Pair<Long, Long> getEstimatedRowStatsForRegion(FeHBaseTable tbl,
+        HRegionLocation location, boolean isCompressed, ClusterStatus clusterStatus)
+        throws IOException {
+      HRegionInfo info = location.getRegionInfo();
+
+      Scan s = new Scan(info.getStartKey());
+      // Get a small sample of rows
+      s.setBatch(ROW_COUNT_ESTIMATE_BATCH_SIZE);
+      // Try and get every version so the row's size can be used to estimate.
+      s.setMaxVersions(Short.MAX_VALUE);
+      // Don't cache the blocks as we don't think these are
+      // necessarily important blocks.
+      s.setCacheBlocks(false);
+      // Try and get deletes too so their size can be counted.
+      s.setRaw(false);
+
+      long currentRowSize = 0;
+      long currentRowCount = 0;
+
+      try (org.apache.hadoop.hbase.client.Table table = getHBaseTable(tbl);
+          ResultScanner rs = table.getScanner(s)) {
+        // Get the the ROW_COUNT_ESTIMATE_BATCH_SIZE fetched rows
+        // for a representative sample
+        for (int i = 0; i < ROW_COUNT_ESTIMATE_BATCH_SIZE; ++i) {
+          Result r = rs.next();
+          if (r == null) break;
+          // Check for empty rows, see IMPALA-1451
+          if (r.isEmpty()) continue;
+          ++currentRowCount;
+          // To estimate the number of rows we simply use the amount of bytes
+          // returned from the underlying buffer. Since HBase internally works
+          // with these structures as well this gives us ok estimates.
+          Cell[] cells = r.rawCells();
+          for (Cell c : cells) {
+            if (c instanceof KeyValue) {
+              currentRowSize += KeyValue
+                  .getKeyValueDataStructureSize(c.getRowLength(), c.getFamilyLength(),
+                      c.getQualifierLength(), c.getValueLength(), c.getTagsLength());
+            } else {
+              throw new IllegalStateException(
+                  "Celltype " + c.getClass().getName() + " not supported.");
+            }
+          }
+        }
+      }
+
+      // If there are no rows then no need to estimate.
+      if (currentRowCount == 0) return new Pair<>(0L, 0L);
+      // Get the size.
+      long currentSize = getRegionSize(location, clusterStatus);
+      // estimate the number of rows.
+      double bytesPerRow = currentRowSize / (double) currentRowCount;
+      if (currentSize == 0) {
+        return new Pair<>(currentRowCount, (long) bytesPerRow);
+      }
+
+      // Compression factor two is only a best effort guess
+      long estimatedRowCount =
+          (long) ((isCompressed ? 2 : 1) * (currentSize / bytesPerRow));
+
+      return new Pair<>(estimatedRowCount, (long) bytesPerRow);
+    }
+
+
+    /**
+     * Returns the size of the given region in bytes. Simply returns the storefile size
+     * for this region from the ClusterStatus. Returns 0 in case of an error.
+     */
+    private static long getRegionSize(HRegionLocation location,
+        ClusterStatus clusterStatus) {
+      HRegionInfo info = location.getRegionInfo();
+      ServerLoad serverLoad = clusterStatus.getLoad(location.getServerName());
+
+      // If the serverLoad is null, the master doesn't have information for this region's
+      // server. This shouldn't normally happen.
+      if (serverLoad == null) {
+        LOG.error("Unable to find server load for server: " + location.getServerName() +
+            " for location " + info.getRegionNameAsString());
+        return 0;
+      }
+      RegionLoad regionLoad = serverLoad.getRegionsLoad().get(info.getRegionName());
+      if (regionLoad == null) {
+        LOG.error("Unable to find regions load for server: " + location.getServerName() +
+            " for location " + info.getRegionNameAsString());
+        return 0;
+      }
+      final long megaByte = 1024L * 1024L;
+      return regionLoad.getStorefileSizeMB() * megaByte;
+    }
+
+    public static THBaseTable getTHBaseTable(FeHBaseTable table) {
+      THBaseTable tHbaseTable = new THBaseTable();
+      tHbaseTable.setTableName(table.getHBaseTableName());
+      for (Column c : table.getColumns()) {
+        HBaseColumn hbaseCol = (HBaseColumn) c;
+        tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
+        if (hbaseCol.getColumnQualifier() != null) {
+          tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
+        } else {
+          tHbaseTable.addToQualifiers("");
+        }
+        tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
+      }
+      return tHbaseTable;
+    }
+
+    /**
+     * Get the corresponding regions for an arbitrary range of keys.
+     * This is copied from org.apache.hadoop.hbase.client.HTable in HBase 0.95. The
+     * difference is that it does not use cache when calling getRegionLocation.
+     *
+     * @param tbl      An FeHBaseTable in the catalog
+     * @param startKey Starting key in range, inclusive
+     * @param endKey   Ending key in range, exclusive
+     * @return A list of HRegionLocations corresponding to the regions that
+     * contain the specified range
+     * @throws IOException if a remote or network exception occurs
+     */
+    public static List<HRegionLocation> getRegionsInRange(FeHBaseTable tbl,
+        final byte[] startKey, final byte[] endKey) throws IOException {
+      try (org.apache.hadoop.hbase.client.Table hbaseTbl = getHBaseTable(tbl)) {
+        final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
+        if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
+          throw new IllegalArgumentException(
+              "Invalid range: " + Bytes.toStringBinary(startKey) + " > " +
+                  Bytes.toStringBinary(endKey));
+        }
+        final List<HRegionLocation> regionList = new ArrayList<>();
+        byte[] currentKey = startKey;
+        Connection connection = ConnectionHolder.getConnection();
+        RegionLocator locator = connection.getRegionLocator(hbaseTbl.getName());
+        do {
+          // always reload region location info.
+          HRegionLocation regionLocation = locator.getRegionLocation(currentKey, true);
+          regionList.add(regionLocation);
+          currentKey = regionLocation.getRegionInfo().getEndKey();
+        } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
+            (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
+        return regionList;
+      }
+    }
+
+    /**
+     * Get the cluster status, making sure we close the admin client afterwards.
+     */
+    static ClusterStatus getClusterStatus() throws IOException {
+      try (Admin admin = ConnectionHolder.getConnection().getAdmin()) {
+        return admin.getClusterStatus();
+      }
+    }
+
+    private static boolean supportsBinaryEncoding(FieldSchema fs, String tblName) {
+      try {
+        Type colType = FeCatalogUtils.parseColumnType(fs, tblName);
+        // Only boolean, integer and floating point types can use binary storage.
+        return colType.isBoolean() || colType.isIntegerType() ||
+            colType.isFloatingPointType();
+      } catch (TableLoadingException e) {
+        return false;
+      }
+    }
+
+    /**
+     * Connection instances are expensive to create. The HBase documentation recommends
+     * one and then sharing it among threads. All operations on a connection are
+     * thread-safe.
+     */
+    static class ConnectionHolder {
+      private static Connection connection_ = null;
+
+      static synchronized Connection getConnection() throws IOException {
+        if (connection_ == null || connection_.isClosed()) {
+          connection_ = ConnectionFactory.createConnection(Util.HBASE_CONF);
+        }
+        return connection_;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index ceb6e70..5aad812 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -17,57 +17,22 @@
 
 package org.apache.impala.catalog;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.log4j.Logger;
-
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCatalogObjectType;
-import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.THBaseTable;
 import org.apache.impala.thrift.TResultSet;
-import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
-import org.apache.impala.util.StatsHelper;
-import org.apache.impala.util.TResultRowBuilder;
 
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Impala representation of HBase table metadata,
@@ -80,28 +45,8 @@ import com.google.common.collect.Lists;
  * This implies that a "select *"-query on an HBase table
  * will not have the columns ordered as they were declared in the DDL.
  * They will be ordered by family/qualifier.
- *
  */
-public class HBaseTable extends Table {
-  // Maximum deviation from the average to stop querying more regions
-  // to estimate the row count
-  private static final double DELTA_FROM_AVERAGE = 0.15;
-
-  private static final Logger LOG = Logger.getLogger(HBaseTable.class);
-
-  // Copied from Hive's HBaseStorageHandler.java.
-  public static final String DEFAULT_PREFIX = "default.";
-
-  // Number of rows fetched during the row count estimation per region
-  public static final int ROW_COUNT_ESTIMATE_BATCH_SIZE = 10;
-
-  // Minimum number of regions that are checked to estimate the row count
-  private static final int MIN_NUM_REGIONS_TO_CHECK = 5;
-
-  // Name of table in HBase.
-  // 'this.name' is the alias of the HBase table in Hive.
-  protected String hbaseTableName_;
-
+public class HBaseTable extends Table implements FeHBaseTable {
   // Input format class for HBase tables read by Hive.
   private static final String HBASE_INPUT_FORMAT =
       "org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat";
@@ -110,316 +55,69 @@ public class HBaseTable extends Table {
   private static final String HBASE_SERIALIZATION_LIB =
       "org.apache.hadoop.hive.hbase.HBaseSerDe";
 
-  // Storage handler class for HBase tables read by Hive.
-  private static final String HBASE_STORAGE_HANDLER =
-      "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
 
-  // Column family of HBase row key
-  private static final String ROW_KEY_COLUMN_FAMILY = ":key";
+  // Name of table in HBase.
+  // 'this.name' is the alias of the HBase table in Hive.
+  private String hbaseTableName_;
 
-  // Keep the conf around
-  private final static Configuration hbaseConf_ = HBaseConfiguration.create();
 
   // Cached column families. Used primarily for speeding up row stats estimation
   // (see IMPALA-4211).
   private HColumnDescriptor[] columnFamilies_ = null;
 
-  protected HBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      Db db, String name, String owner) {
+  protected HBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db,
+      String name, String owner) {
     super(msTbl, db, name, owner);
   }
 
   /**
-   * Connection instances are expensive to create. The HBase documentation recommends
-   * one and then sharing it among threads. All operations on a connection are
-   * thread-safe.
-   */
-  private static class ConnectionHolder {
-    private static Connection connection_ = null;
-
-    public static synchronized Connection getConnection(Configuration conf)
-        throws IOException {
-      if (connection_ == null || connection_.isClosed()) {
-        connection_ = ConnectionFactory.createConnection(conf);
-      }
-      return connection_;
-    }
-  }
-
-  /**
-   * Table client objects are thread-unsafe and cheap to create. The HBase docs recommend
-   * creating a new one for each task and then closing when done.
-   */
-  public org.apache.hadoop.hbase.client.Table getHBaseTable() throws IOException {
-    return ConnectionHolder.getConnection(hbaseConf_)
-        .getTable(TableName.valueOf(hbaseTableName_));
-  }
-
-  private void closeHBaseTable(org.apache.hadoop.hbase.client.Table table) {
-    try {
-      table.close();
-    } catch (IOException e) {
-      LOG.error("Error closing HBase table: " + hbaseTableName_, e);
-    }
-  }
-
-  /**
-   * Get the cluster status, making sure we close the admin client afterwards.
-   */
-  public ClusterStatus getClusterStatus() throws IOException {
-    Admin admin = null;
-    ClusterStatus clusterStatus = null;
-    try {
-      Connection connection = ConnectionHolder.getConnection(hbaseConf_);
-      admin = connection.getAdmin();
-      clusterStatus = admin.getClusterStatus();
-    } finally {
-      if (admin != null) admin.close();
-    }
-    return clusterStatus;
-  }
-
-  /**
-   * Parse the column description string to the column families and column
-   * qualifies. This is a copy of HBaseSerDe.parseColumnMapping and
-   * parseColumnStorageTypes with parts we don't use removed. The hive functions
-   * are not public.
-
-   * tableDefaultStorageIsBinary - true if table is default to binary encoding
-   * columnsMappingSpec - input string format describing the table
-   * fieldSchemas - input field schema from metastore table
-   * columnFamilies/columnQualifiers/columnBinaryEncodings - out parameters that will be
-   * filled with the column family, column qualifier and encoding for each column.
+   * Returns true if the given Metastore Table represents an HBase table.
+   * Versions of Hive/HBase are inconsistent which HBase related fields are set
+   * (e.g., HIVE-6548 changed the input format to null).
+   * For maximum compatibility consider all known fields that indicate an HBase table.
    */
-  private void parseColumnMapping(boolean tableDefaultStorageIsBinary,
-      String columnsMappingSpec, List<FieldSchema> fieldSchemas,
-      List<String> columnFamilies, List<String> columnQualifiers,
-      List<Boolean> colIsBinaryEncoded) throws SerDeException {
-    if (columnsMappingSpec == null) {
-      throw new SerDeException(
-          "Error: hbase.columns.mapping missing for this HBase table.");
-    }
-
-    if (columnsMappingSpec.equals("") ||
-        columnsMappingSpec.equals(HBaseSerDe.HBASE_KEY_COL)) {
-      throw new SerDeException("Error: hbase.columns.mapping specifies only "
-          + "the HBase table row key. A valid Hive-HBase table must specify at "
-          + "least one additional column.");
-    }
-
-    int rowKeyIndex = -1;
-    String[] columnSpecs = columnsMappingSpec.split(",");
-    // If there was an implicit key column mapping, the number of columns (fieldSchemas)
-    // will be one more than the number of column mapping specs.
-    int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
-    if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
-      // This should never happen - Hive blocks creating a mismatched table and both Hive
-      // and Impala currently block all column-level DDL on HBase tables.
-      throw new SerDeException(String.format("Number of entries in " +
-          "'hbase.columns.mapping' does not match the number of columns in the " +
-          "table: %d != %d (counting the key if implicit)",
-          columnSpecs.length, fieldSchemas.size()));
-    }
-
-    for (int i = 0; i < columnSpecs.length; ++i) {
-      String mappingSpec = columnSpecs[i];
-      String[] mapInfo = mappingSpec.split("#");
-      // Trim column info so that serdeproperties with new lines still parse correctly.
-      String colInfo = mapInfo[0].trim();
-
-      int idxFirst = colInfo.indexOf(":");
-      int idxLast = colInfo.lastIndexOf(":");
-
-      if (idxFirst < 0 || !(idxFirst == idxLast)) {
-        throw new SerDeException("Error: the HBase columns mapping contains a "
-            + "badly formed column family, column qualifier specification.");
-      }
-
-      if (colInfo.equals(HBaseSerDe.HBASE_KEY_COL)) {
-        Preconditions.checkState(fsStartIdxOffset == 0);
-        rowKeyIndex = i;
-        columnFamilies.add(colInfo);
-        columnQualifiers.add(null);
-      } else {
-        String[] parts = colInfo.split(":");
-        Preconditions.checkState(parts.length > 0 && parts.length <= 2);
-        columnFamilies.add(parts[0]);
-        if (parts.length == 2) {
-          columnQualifiers.add(parts[1]);
-        } else {
-          columnQualifiers.add(null);
-        }
-      }
-
-      // Set column binary encoding
-      FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
-      boolean supportsBinaryEncoding = supportsBinaryEncoding(fieldSchema);
-      if (mapInfo.length == 1) {
-        // There is no column level storage specification. Use the table storage spec.
-        colIsBinaryEncoded.add(
-            new Boolean(tableDefaultStorageIsBinary && supportsBinaryEncoding));
-      } else if (mapInfo.length == 2) {
-        // There is a storage specification for the column
-        String storageOption = mapInfo[1];
-
-        if (!(storageOption.equals("-") || "string".startsWith(storageOption) || "binary"
-            .startsWith(storageOption))) {
-          throw new SerDeException("Error: A column storage specification is one of"
-              + " the following: '-', a prefix of 'string', or a prefix of 'binary'. "
-              + storageOption + " is not a valid storage option specification for "
-              + fieldSchema.getName());
-        }
-
-        boolean isBinaryEncoded = false;
-        if ("-".equals(storageOption)) {
-          isBinaryEncoded = tableDefaultStorageIsBinary;
-        } else if ("binary".startsWith(storageOption)) {
-          isBinaryEncoded = true;
-        }
-        if (isBinaryEncoded && !supportsBinaryEncoding) {
-          // Use string encoding and log a warning if the column spec is binary but the
-          // column type does not support it.
-          // TODO: Hive/HBase does not raise an exception, but should we?
-          LOG.warn("Column storage specification for column " + fieldSchema.getName()
-              + " is binary" + " but the column type " + fieldSchema.getType() +
-              " does not support binary encoding. Fallback to string format.");
-          isBinaryEncoded = false;
-        }
-        colIsBinaryEncoded.add(isBinaryEncoded);
-      } else {
-        // error in storage specification
-        throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING
-            + " storage specification " + mappingSpec + " is not valid for column: "
-            + fieldSchema.getName());
-      }
-    }
-
-    if (rowKeyIndex == -1) {
-      columnFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
-      columnQualifiers.add(0, null);
-      colIsBinaryEncoded.add(0,
-          supportsBinaryEncoding(fieldSchemas.get(0)) && tableDefaultStorageIsBinary);
-    }
-  }
-
-  private boolean supportsBinaryEncoding(FieldSchema fs) {
-    try {
-      Type colType = parseColumnType(fs);
-      // Only boolean, integer and floating point types can use binary storage.
-      return colType.isBoolean() || colType.isIntegerType()
-          || colType.isFloatingPointType();
-    } catch (TableLoadingException e) {
-      return false;
+  public static boolean isHBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    if (msTbl.getParameters() != null &&
+        msTbl.getParameters().containsKey(Util.HBASE_STORAGE_HANDLER)) {
+      return true;
     }
+    StorageDescriptor sd = msTbl.getSd();
+    if (sd == null) return false;
+    if (sd.getInputFormat() != null && sd.getInputFormat().equals(HBASE_INPUT_FORMAT)) {
+      return true;
+    } else return sd.getSerdeInfo() != null &&
+        sd.getSerdeInfo().getSerializationLib() != null &&
+        sd.getSerdeInfo().getSerializationLib().equals(HBASE_SERIALIZATION_LIB);
   }
 
-  @Override
   /**
    * For hbase tables, we can support tables with columns we don't understand at
    * all (e.g. map) as long as the user does not select those. This is in contrast
    * to hdfs tables since we typically need to understand all columns to make sense
    * of the file at all.
    */
+  @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
-    final Timer.Context context =
-        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
-    try {
+    try (Timer.Context timer = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) {
       msTable_ = msTbl;
-      hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
+      hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
       // Warm up the connection and verify the table exists.
-      getHBaseTable().close();
+      Util.getHBaseTable(hbaseTableName_).close();
       columnFamilies_ = null;
-      Map<String, String> serdeParams =
-          getMetaStoreTable().getSd().getSerdeInfo().getParameters();
-      String hbaseColumnsMapping = serdeParams.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-      if (hbaseColumnsMapping == null) {
-        throw new MetaException("No hbase.columns.mapping defined in Serde.");
-      }
-
-      String hbaseTableDefaultStorageType = getMetaStoreTable().getParameters().get(
-          HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
-      boolean tableDefaultStorageIsBinary = false;
-      if (hbaseTableDefaultStorageType != null &&
-          !hbaseTableDefaultStorageType.isEmpty()) {
-        if (hbaseTableDefaultStorageType.equalsIgnoreCase("binary")) {
-          tableDefaultStorageIsBinary = true;
-        } else if (!hbaseTableDefaultStorageType.equalsIgnoreCase("string")) {
-          throw new SerDeException("Error: " +
-              HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
-              " parameter must be specified as" +
-              " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
-              "' is not a valid specification for this table/serde property.");
-        }
-      }
-
-      // Parse HBase column-mapping string.
-      List<FieldSchema> fieldSchemas = getMetaStoreTable().getSd().getCols();
-      List<String> hbaseColumnFamilies = new ArrayList<String>();
-      List<String> hbaseColumnQualifiers = new ArrayList<String>();
-      List<Boolean> hbaseColumnBinaryEncodings = new ArrayList<Boolean>();
-      parseColumnMapping(tableDefaultStorageIsBinary, hbaseColumnsMapping, fieldSchemas,
-          hbaseColumnFamilies, hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
-      Preconditions.checkState(
-          hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
-      Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
-
-      // Populate tmp cols in the order they appear in the Hive metastore.
-      // We will reorder the cols below.
-      List<HBaseColumn> tmpCols = Lists.newArrayList();
-      // Store the key column separately.
-      // TODO: Change this to an ArrayList once we support composite row keys.
-      HBaseColumn keyCol = null;
-      for (int i = 0; i < fieldSchemas.size(); ++i) {
-        FieldSchema s = fieldSchemas.get(i);
-        Type t = Type.INVALID;
-        try {
-          t = parseColumnType(s);
-        } catch (TableLoadingException e) {
-          // Ignore hbase types we don't support yet. We can load the metadata
-          // but won't be able to select from it.
-        }
-        HBaseColumn col = new HBaseColumn(s.getName(), hbaseColumnFamilies.get(i),
-            hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i),
-            t, s.getComment(), -1);
-        if (col.getColumnFamily().equals(ROW_KEY_COLUMN_FAMILY)) {
-          // Store the row key column separately from the rest
-          keyCol = col;
-        } else {
-          tmpCols.add(col);
-        }
-      }
-      Preconditions.checkState(keyCol != null);
-
-      // The backend assumes that the row key column is always first and
-      // that the remaining HBase columns are ordered by columnFamily,columnQualifier,
-      // so the final position depends on the other mapped HBase columns.
-      // Sort columns and update positions.
-      Collections.sort(tmpCols);
+      List<Column> cols = Util.loadColumns(msTable_);
       clearColumns();
-
-      keyCol.setPosition(0);
-      addColumn(keyCol);
-      // Update the positions of the remaining columns
-      for (int i = 0; i < tmpCols.size(); ++i) {
-        HBaseColumn col = tmpCols.get(i);
-        col.setPosition(i + 1);
-        addColumn(col);
-      }
-
+      for (Column col : cols) addColumn(col);
       // Set table stats.
       setTableStats(msTable_);
-
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col
       numClusteringCols_ = 1;
       loadAllColumnStats(client);
     } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for HBase table: " +
-          name_, e);
-    } finally {
-      context.stop();
+      throw new TableLoadingException("Failed to load metadata for HBase table: " + name_,
+          e);
     }
   }
 
@@ -427,233 +125,19 @@ public class HBaseTable extends Table {
   protected void loadFromThrift(TTable table) throws TableLoadingException {
     super.loadFromThrift(table);
     try {
-      hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
+      hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
       // Warm up the connection and verify the table exists.
-      getHBaseTable().close();
+      Util.getHBaseTable(hbaseTableName_).close();
       columnFamilies_ = null;
     } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for HBase table from " +
-          "thrift table: " + name_, e);
-    }
-  }
-
-  /**
-   * This method is completely copied from Hive's HBaseStorageHandler.java.
-   */
-  private String getHBaseTableName(org.apache.hadoop.hive.metastore.api.Table tbl) {
-    // Give preference to TBLPROPERTIES over SERDEPROPERTIES
-    // (really we should only use TBLPROPERTIES, so this is just
-    // for backwards compatibility with the original specs).
-    String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
-    if (tableName == null) {
-      tableName = tbl.getSd().getSerdeInfo().getParameters().get(
-          HBaseSerDe.HBASE_TABLE_NAME);
-    }
-    if (tableName == null) {
-      tableName = tbl.getDbName() + "." + tbl.getTableName();
-      if (tableName.startsWith(DEFAULT_PREFIX)) {
-        tableName = tableName.substring(DEFAULT_PREFIX.length());
-      }
+      throw new TableLoadingException(
+          "Failed to load metadata for HBase table from thrift table: " + name_, e);
     }
-    return tableName;
   }
 
-  /**
-   * Estimates the number of rows for a single region and returns a pair with
-   * the estimated row count and the estimated size in bytes per row.
-   */
-  private Pair<Long, Long> getEstimatedRowStatsForRegion(HRegionLocation location,
-      boolean isCompressed, ClusterStatus clusterStatus) throws IOException {
-    HRegionInfo info = location.getRegionInfo();
-
-    Scan s = new Scan(info.getStartKey());
-    // Get a small sample of rows
-    s.setBatch(ROW_COUNT_ESTIMATE_BATCH_SIZE);
-    // Try and get every version so the row's size can be used to estimate.
-    s.setMaxVersions(Short.MAX_VALUE);
-    // Don't cache the blocks as we don't think these are
-    // necessarily important blocks.
-    s.setCacheBlocks(false);
-    // Try and get deletes too so their size can be counted.
-    s.setRaw(false);
-
-    org.apache.hadoop.hbase.client.Table table = getHBaseTable();
-    ResultScanner rs = table.getScanner(s);
-
-    long currentRowSize = 0;
-    long currentRowCount = 0;
-
-    try {
-      // Get the the ROW_COUNT_ESTIMATE_BATCH_SIZE fetched rows
-      // for a representative sample
-      for (int i = 0; i < ROW_COUNT_ESTIMATE_BATCH_SIZE; ++i) {
-        Result r = rs.next();
-        if (r == null)
-          break;
-        // Check for empty rows, see IMPALA-1451
-        if (r.isEmpty())
-          continue;
-        ++currentRowCount;
-        // To estimate the number of rows we simply use the amount of bytes
-        // returned from the underlying buffer. Since HBase internally works
-        // with these structures as well this gives us ok estimates.
-        Cell[] cells = r.rawCells();
-        for (Cell c : cells) {
-          if (c instanceof KeyValue) {
-            currentRowSize += KeyValue.getKeyValueDataStructureSize(c.getRowLength(),
-                c.getFamilyLength(), c.getQualifierLength(), c.getValueLength(),
-                c.getTagsLength());
-          } else {
-            throw new IllegalStateException("Celltype " + c.getClass().getName() +
-                " not supported.");
-          }
-        }
-      }
-    } finally {
-      rs.close();
-      closeHBaseTable(table);
-    }
-
-    // If there are no rows then no need to estimate.
-    if (currentRowCount == 0) return new Pair<Long, Long>(0L, 0L);
-    // Get the size.
-    long currentSize = getRegionSize(location, clusterStatus);
-    // estimate the number of rows.
-    double bytesPerRow = currentRowSize / (double) currentRowCount;
-    if (currentSize == 0) {
-      return new Pair<Long, Long>(currentRowCount, (long) bytesPerRow);
-    }
-
-    // Compression factor two is only a best effort guess
-    long estimatedRowCount =
-        (long) ((isCompressed ? 2 : 1) * (currentSize / bytesPerRow));
-
-    return new Pair<Long, Long>(estimatedRowCount, (long) bytesPerRow);
-  }
-
-  /**
-   * Get an estimate of the number of rows and bytes per row in regions between
-   * startRowKey and endRowKey.
-   *
-   * This number is calculated by incrementally checking as many region servers as
-   * necessary until we observe a relatively constant row size per region on average.
-   * Depending on the skew of data in the regions this can either mean that we need
-   * to check only a minimal number of regions or that we will scan all regions.
-   *
-   * The HBase region servers periodically update the master with their metrics,
-   * including storefile size. We get the size of the storefiles for all regions in
-   * the cluster with a single call to getClusterStatus from the master.
-   *
-   * The accuracy of this number is determined by the number of rows that are written
-   * and kept in the memstore and have not been flushed until now. A large number
-   * of key-value pairs in the memstore will lead to bad estimates as this number
-   * is not reflected in the storefile size that is used to estimate this number.
-   *
-   * Currently, the algorithm does not consider the case that the key range used as a
-   * parameter might be generally of different size than the rest of the region.
-   *
-   * The values computed here should be cached so that in high qps workloads
-   * the nn is not overwhelmed. Could be done in load(); Synchronized to make
-   * sure that only one thread at a time is using the htable.
-   *
-   * @param startRowKey
-   *          First row key in the range
-   * @param endRowKey
-   *          Last row key in the range
-   * @return The estimated number of rows in the regions between the row keys (first) and
-   *         the estimated row size in bytes (second).
-   */
   public synchronized Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey,
       byte[] endRowKey) {
-    Preconditions.checkNotNull(startRowKey);
-    Preconditions.checkNotNull(endRowKey);
-
-    boolean isCompressed = false;
-    long rowCount = 0;
-    long rowSize = 0;
-
-    org.apache.hadoop.hbase.client.Table table = null;
-    try {
-      table = getHBaseTable();
-      ClusterStatus clusterStatus = getClusterStatus();
-
-      // Check to see if things are compressed.
-      // If they are we'll estimate a compression factor.
-      if (columnFamilies_ == null) {
-        columnFamilies_ = table.getTableDescriptor().getColumnFamilies();
-      }
-      Preconditions.checkNotNull(columnFamilies_);
-      for (HColumnDescriptor desc : columnFamilies_) {
-        isCompressed |= desc.getCompression() !=  Compression.Algorithm.NONE;
-      }
-
-      // Fetch all regions for the key range
-      List<HRegionLocation> locations = getRegionsInRange(table, startRowKey, endRowKey);
-      Collections.shuffle(locations);
-      // The following variables track the number and size of 'rows' in
-      // HBase and allow incremental calculation of the average and standard
-      // deviation.
-      StatsHelper<Long> statsSize = new StatsHelper<Long>();
-      long totalEstimatedRows = 0;
-
-      // Collects stats samples from at least MIN_NUM_REGIONS_TO_CHECK
-      // and at most all regions until the delta is small enough.
-      while ((statsSize.count() < MIN_NUM_REGIONS_TO_CHECK ||
-          statsSize.stddev() > statsSize.mean() * DELTA_FROM_AVERAGE) &&
-          statsSize.count() < locations.size()) {
-        HRegionLocation currentLocation = locations.get((int) statsSize.count());
-        Pair<Long, Long> tmp = getEstimatedRowStatsForRegion(currentLocation,
-            isCompressed, clusterStatus);
-        totalEstimatedRows += tmp.first;
-        statsSize.addSample(tmp.second);
-      }
-
-      // Sum up the total size for all regions in range.
-      long totalSize = 0;
-      for (final HRegionLocation location : locations) {
-        totalSize += getRegionSize(location, clusterStatus);
-      }
-      if (totalSize == 0) {
-        rowCount = totalEstimatedRows;
-      } else {
-        rowCount = (long) (totalSize / statsSize.mean());
-      }
-      rowSize = (long) statsSize.mean();
-    } catch (IOException ioe) {
-      // Print the stack trace, but we'll ignore it
-      // as this is just an estimate.
-      // TODO: Put this into the per query log.
-      LOG.error("Error computing HBase row count estimate", ioe);
-      return new Pair<Long, Long>(-1l, -1l);
-    } finally {
-      if (table != null) closeHBaseTable(table);
-    }
-    return new Pair<Long, Long>(rowCount, rowSize);
-  }
-
-  /**
-   * Returns the size of the given region in bytes. Simply returns the storefile size
-   * for this region from the ClusterStatus. Returns 0 in case of an error.
-   */
-  public long getRegionSize(HRegionLocation location, ClusterStatus clusterStatus) {
-    HRegionInfo info = location.getRegionInfo();
-    ServerLoad serverLoad = clusterStatus.getLoad(location.getServerName());
-
-    // If the serverLoad is null, the master doesn't have information for this region's
-    // server. This shouldn't normally happen.
-    if (serverLoad == null) {
-      LOG.error("Unable to find server load for server: " + location.getServerName() +
-          " for location " + info.getRegionNameAsString());
-      return 0;
-    }
-    RegionLoad regionLoad = serverLoad.getRegionsLoad().get(info.getRegionName());
-    if (regionLoad == null) {
-      LOG.error("Unable to find regions load for server: " + location.getServerName() +
-          " for location " + info.getRegionNameAsString());
-      return 0;
-    }
-    final long megaByte = 1024L * 1024L;
-    return regionLoad.getStorefileSizeMB() * megaByte;
+    return Util.getEstimatedRowStats(this, startRowKey, endRowKey);
   }
 
   /**
@@ -665,11 +149,12 @@ public class HBaseTable extends Table {
   }
 
   @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
     TTableDescriptor tableDescriptor =
-        new TTableDescriptor(tableId, TTableType.HBASE_TABLE,
-            getTColumnDescriptors(), numClusteringCols_, hbaseTableName_, db_.getName());
-    tableDescriptor.setHbaseTable(getTHBaseTable());
+        new TTableDescriptor(tableId, TTableType.HBASE_TABLE, getTColumnDescriptors(),
+            numClusteringCols_, hbaseTableName_, db_.getName());
+    tableDescriptor.setHbaseTable(Util.getTHBaseTable(this));
     return tableDescriptor;
   }
 
@@ -678,6 +163,11 @@ public class HBaseTable extends Table {
   }
 
   @Override
+  public TResultSet getTableStats() {
+    return Util.getTableStats(this);
+  }
+
+  @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
   }
@@ -686,161 +176,29 @@ public class HBaseTable extends Table {
   public TTable toThrift() {
     TTable table = super.toThrift();
     table.setTable_type(TTableType.HBASE_TABLE);
-    table.setHbase_table(getTHBaseTable());
+    table.setHbase_table(Util.getTHBaseTable(this));
     return table;
   }
 
-  private THBaseTable getTHBaseTable() {
-    THBaseTable tHbaseTable = new THBaseTable();
-    tHbaseTable.setTableName(hbaseTableName_);
-    for (Column c : getColumns()) {
-      HBaseColumn hbaseCol = (HBaseColumn) c;
-      tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
-      if (hbaseCol.getColumnQualifier() != null) {
-        tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
-      } else {
-        tHbaseTable.addToQualifiers("");
-      }
-      tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
-    }
-    return tHbaseTable;
-  }
-
-  /**
-   * Get the corresponding regions for an arbitrary range of keys.
-   * This is copied from org.apache.hadoop.hbase.client.HTable in HBase 0.95. The
-   * differences are:
-   * 1. It does not use cache when calling getRegionLocation.
-   * 2. It is synchronized on hbaseTbl.
-   *
-   * @param startKey
-   *          Starting key in range, inclusive
-   * @param endKey
-   *          Ending key in range, exclusive
-   * @return A list of HRegionLocations corresponding to the regions that
-   *         contain the specified range
-   * @throws IOException
-   *           if a remote or network exception occurs
-   */
-  public static List<HRegionLocation> getRegionsInRange(
-      org.apache.hadoop.hbase.client.Table hbaseTbl,
-      final byte[] startKey, final byte[] endKey) throws IOException {
-    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
-    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
-      throw new IllegalArgumentException("Invalid range: " +
-          Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
-    }
-    final List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
-    byte[] currentKey = startKey;
-    Connection connection = ConnectionHolder.getConnection(hbaseConf_);
-    // Make sure only one thread is accessing the hbaseTbl.
-    synchronized (hbaseTbl) {
-      RegionLocator locator = connection.getRegionLocator(hbaseTbl.getName());
-      do {
-        // always reload region location info.
-        HRegionLocation regionLocation = locator.getRegionLocation(currentKey, true);
-        regionList.add(regionLocation);
-        currentKey = regionLocation.getRegionInfo().getEndKey();
-      } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
-          (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
-    }
-    return regionList;
-  }
-
   /**
    * Returns the storage handler class for HBase tables read by Hive.
    */
   @Override
   public String getStorageHandlerClassName() {
-    return HBASE_STORAGE_HANDLER;
+    return Util.HBASE_STORAGE_HANDLER;
   }
 
   /**
-   * Returns statistics on this table as a tabular result set. Used for the
-   * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
-   * inside this method.
+   * Fetch or use cached column families.
    */
-  public TResultSet getTableStats() {
-    TResultSet result = new TResultSet();
-    TResultSetMetadata resultSchema = new TResultSetMetadata();
-    result.setSchema(resultSchema);
-    resultSchema.addToColumns(
-        new TColumn("Region Location", Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Start RowKey",
-        Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Est. #Rows", Type.BIGINT.toThrift()));
-    resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
-
-    org.apache.hadoop.hbase.client.Table table;
-    try {
-      table = getHBaseTable();
-    } catch (IOException e) {
-      LOG.error("Error getting HBase table " + hbaseTableName_, e);
-      throw new RuntimeException(e);
-    }
-
-    // TODO: Consider fancier stats maintenance techniques for speeding up this process.
-    // Currently, we list all regions and perform a mini-scan of each of them to
-    // estimate the number of rows, the data size, etc., which is rather expensive.
-    try {
-      ClusterStatus clusterStatus = getClusterStatus();
-      long totalNumRows = 0;
-      long totalSize = 0;
-      List<HRegionLocation> regions = HBaseTable.getRegionsInRange(table,
-          HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
-      for (HRegionLocation region : regions) {
-        TResultRowBuilder rowBuilder = new TResultRowBuilder();
-        HRegionInfo regionInfo = region.getRegionInfo();
-        Pair<Long, Long> estRowStats =
-            getEstimatedRowStatsForRegion(region, false, clusterStatus);
-
-        long numRows = estRowStats.first.longValue();
-        long regionSize = getRegionSize(region, clusterStatus);
-        totalNumRows += numRows;
-        totalSize += regionSize;
-
-        // Add the region location, start rowkey, number of rows and raw size.
-        rowBuilder.add(String.valueOf(region.getHostname()))
-            .add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
-            .addBytes(regionSize);
-        result.addToRows(rowBuilder.get());
-      }
-
-      // Total num rows and raw region size.
-      if (regions.size() > 1) {
-        TResultRowBuilder rowBuilder = new TResultRowBuilder();
-        rowBuilder.add("Total").add("").add(totalNumRows).addBytes(totalSize);
-        result.addToRows(rowBuilder.get());
+  @Override
+  public HColumnDescriptor[] getColumnFamilies() throws IOException {
+    if (columnFamilies_ == null) {
+      try (org.apache.hadoop.hbase.client.Table hBaseTable = Util
+          .getHBaseTable(getHBaseTableName())) {
+        columnFamilies_ = hBaseTable.getTableDescriptor().getColumnFamilies();
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      closeHBaseTable(table);
-    }
-    return result;
-  }
-
-  /**
-   * Returns true if the given Metastore Table represents an HBase table.
-   * Versions of Hive/HBase are inconsistent which HBase related fields are set
-   * (e.g., HIVE-6548 changed the input format to null).
-   * For maximum compatibility consider all known fields that indicate an HBase table.
-   */
-  public static boolean isHBaseTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    if (msTbl.getParameters() != null &&
-        msTbl.getParameters().containsKey(HBASE_STORAGE_HANDLER)) {
-      return true;
-    }
-    StorageDescriptor sd = msTbl.getSd();
-    if (sd == null) return false;
-    if (sd.getInputFormat() != null && sd.getInputFormat().equals(HBASE_INPUT_FORMAT)) {
-      return true;
-    } else if (sd.getSerdeInfo() != null &&
-        sd.getSerdeInfo().getSerializationLib() != null &&
-        sd.getSerdeInfo().getSerializationLib().equals(HBASE_SERIALIZATION_LIB)) {
-      return true;
     }
-    return false;
+    return columnFamilies_;
   }
 }