You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/16 03:50:43 UTC

[1/7] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and Avatica 1.9 [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/yang22-hbase1.x b8a491c0b -> 4853dcae3 (forced update)


http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 4f42913..988c4c6 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -68,10 +68,10 @@
                         <configuration>
                             <createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
-                                <!-- jackson is already packaged into calcite-avatica.jar. To avoid including jackson
-                                twice, we include calcite-avatica which has jackson and exclude jackson. -->
                                 <excludes>
-                                    <exclude>com.fasterxml.jackson.core:*</exclude>
+                                    <exclude>com.google.protobuf:*</exclude>
+                                    <exclude>commons-logging:*</exclude>
+                                    <exclude>commons-codec:*</exclude>
                                 </excludes>
                             </artifactSet>
                             <relocations>
@@ -87,10 +87,6 @@
                                     <pattern>org.apache.http</pattern>
                                     <shadedPattern>${shadeBase}.org.apache.http</shadedPattern>
                                 </relocation>
-                                <relocation>
-                                    <pattern>org.apache.commons</pattern>
-                                    <shadedPattern>${shadeBase}.org.apache.commons</shadedPattern>
-                                </relocation>
                             </relocations>
                             <filters>
                                 <filter>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
index a1b9aef..8e69e68 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
@@ -209,7 +209,7 @@ public class KylinMeta extends MetaImpl {
             } catch (NoSuchFieldException e) {
                 throw new RuntimeException(e);
             }
-            columns.add(columnMetaData(name, index, field.getType()));
+            columns.add(columnMetaData(name, index, field.getType(), true));
             fields.add(field);
             fieldNames.add(fieldName);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 86f2544..b901137 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -40,6 +40,12 @@
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>atopcalcite</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>avatica-core</artifactId>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -71,14 +77,6 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-query</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-linq4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
 
         <!-- Env & Test -->
 
@@ -300,7 +298,9 @@
                                         <argument>-DuseSandbox=true</argument>
                                         <argument>-Dhdp.version=${hdp.version}</argument>
                                         <argument>-DfastBuildMode=${fastBuildMode}</argument>
-                                        <argument>-Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties</argument>
+                                        <argument>
+                                            -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+                                        </argument>
                                         <argument>-classpath</argument>
                                         <classpath/>
                                         <argument>org.apache.kylin.provision.BuildCubeWithEngine</argument>
@@ -322,7 +322,9 @@
                                         <argument>-DuseSandbox=true</argument>
                                         <argument>-Dhdp.version=${hdp.version}</argument>
                                         <argument>-DfastBuildMode=${fastBuildMode}</argument>
-                                        <argument>-Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties</argument>
+                                        <argument>
+                                            -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+                                        </argument>
                                         <argument>-classpath</argument>
                                         <classpath/>
                                         <argument>org.apache.kylin.provision.BuildCubeWithStream</argument>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
index 2f8991b..99b73a9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.jdbc;
 import java.io.File;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.Driver;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51479c8..8b2e0aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,8 @@
         <aspectj.version>1.8.9</aspectj.version>
 
         <!-- Calcite Version -->
-        <calcite.version>1.8.0</calcite.version>
+        <calcite.version>1.11.0</calcite.version>
+        <avatica.version>1.9.0</avatica.version>
 
         <!-- Sonar -->
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -456,8 +457,7 @@
             <dependency>
                 <groupId>org.apache.calcite.avatica</groupId>
                 <artifactId>avatica</artifactId>
-                <version>${calcite.version}</version>
-
+                <version>${avatica.version}</version>
             </dependency>
             <!-- Workaround for hive 0.14 avatica dependency -->
             <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/query/pom.xml
----------------------------------------------------------------------
diff --git a/query/pom.xml b/query/pom.xml
index c51812d..badf214 100644
--- a/query/pom.xml
+++ b/query/pom.xml
@@ -41,16 +41,18 @@
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>atopcalcite</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-storage</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>


[5/7] kylin git commit: minor, add back query level scan threshold

Posted by li...@apache.org.
minor, add back query level scan threshold


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7564274c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7564274c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7564274c

Branch: refs/heads/yang22-hbase1.x
Commit: 7564274c369abb86860799ba64f4ace609d4140f
Parents: 344c707
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Feb 16 10:41:52 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Feb 16 10:41:52 2017 +0800

----------------------------------------------------------------------
 .../kylin/storage/gtrecord/SequentialCubeTupleIterator.java    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7564274c/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 49080d6..db56ce6 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -33,6 +33,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,6 +137,11 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
     @Override
     public boolean hasNext() {
+
+        if (scanCount >= context.getThreshold()) {
+            throw new ScanOutOfLimitException("Scan row count exceeded threshold at query level: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
+        }
+        
         return tupleIterator.hasNext();
     }
 


[4/7] kylin git commit: KYLIN-1770 upgrade calcite to 1.10

Posted by li...@apache.org.
KYLIN-1770 upgrade calcite to 1.10


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/344c7076
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/344c7076
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/344c7076

Branch: refs/heads/yang22-hbase1.x
Commit: 344c7076838f73bed10510cb1b61909e29e2028b
Parents: 4b413a2
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Feb 16 10:37:36 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Feb 16 10:37:36 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/query/relnode/OLAPTableScan.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/344c7076/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
index 14758c9..c743870 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
@@ -45,6 +45,7 @@ import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
 import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
 import org.apache.calcite.rel.rules.AggregateUnionTransposeRule;
+import org.apache.calcite.rel.rules.DateRangeRules;
 import org.apache.calcite.rel.rules.FilterJoinRule;
 import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
 import org.apache.calcite.rel.rules.JoinCommuteRule;
@@ -172,6 +173,7 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel {
         planner.removeRule(JoinUnionTransposeRule.LEFT_UNION);
         planner.removeRule(JoinUnionTransposeRule.RIGHT_UNION);
         planner.removeRule(AggregateUnionTransposeRule.INSTANCE);
+        planner.removeRule(DateRangeRules.FILTER_INSTANCE);
         // distinct count will be split into a separated query that is joined with the left query
         planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
 


[6/7] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 5b2441c..2f7e164 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -24,11 +24,11 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable {
         List<String> oldTables = getOldHTables();
         if (oldTables != null && oldTables.size() > 0) {
             String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-            HBaseAdmin admin = null;
+            Admin admin = null;
             try {
-                admin = new HBaseAdmin(conf);
+                Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+                admin = conn.getAdmin();
+
                 for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                    if (admin.tableExists(TableName.valueOf(table))) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table)));
                         String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
                         if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
+                            if (admin.isTableEnabled(TableName.valueOf(table))) {
+                                admin.disableTable(TableName.valueOf(table));
                             }
-                            admin.deleteTable(table);
+                            admin.deleteTable(TableName.valueOf(table));
                             logger.debug("Dropped htable: " + table);
                             output.append("HBase table " + table + " is dropped. \n");
                         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
index a150607..56f867a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java
@@ -21,9 +21,11 @@ package org.apache.kylin.storage.hbase.util;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -38,8 +40,8 @@ public class CleanHtableCLI extends AbstractApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CleanHtableCLI.class);
 
     private void clean() throws IOException {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
 
         for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
             String name = descriptor.getNameAsString().toLowerCase();
@@ -50,7 +52,7 @@ public class CleanHtableCLI extends AbstractApplication {
                 System.out.println();
 
                 descriptor.setValue(IRealizationConstants.HTableOwner, "DL-eBay-Kylin@ebay.com");
-                hbaseAdmin.modifyTable(descriptor.getNameAsString(), descriptor);
+                hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor);
             }
         }
         hbaseAdmin.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 68c0a39..581de38 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -26,19 +26,19 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
@@ -89,7 +89,7 @@ public class CubeMigrationCLI {
     private static ResourceStore srcStore;
     private static ResourceStore dstStore;
     private static FileSystem hdfsFS;
-    private static HBaseAdmin hbaseAdmin;
+    private static Admin hbaseAdmin;
 
     public static final String ACL_INFO_FAMILY = "i";
     private static final String ACL_TABLE_NAME = "_acl";
@@ -134,8 +134,8 @@ public class CubeMigrationCLI {
 
         checkAndGetHbaseUrl();
 
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        hbaseAdmin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(srcConfig.getStorageUrl());
+        hbaseAdmin = conn.getAdmin();
 
         hdfsFS = HadoopUtil.getWorkingFileSystem();
 
@@ -233,6 +233,7 @@ public class CubeMigrationCLI {
             operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
         }
     }
+
     private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException {
         String projectResPath = ProjectInstance.concatResourcePath(projectName);
         if (!dstStore.exists(projectResPath))
@@ -326,8 +327,8 @@ public class CubeMigrationCLI {
 
         switch (opt.type) {
         case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            TableName tableName = TableName.valueOf((String) opt.params[0]);
+            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(tableName);
             hbaseAdmin.disableTable(tableName);
             desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
             hbaseAdmin.modifyTable(tableName, desc);
@@ -449,11 +450,11 @@ public class CubeMigrationCLI {
             Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
             ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
             String projUUID = project.getUuid();
-            HTableInterface srcAclHtable = null;
-            HTableInterface destAclHtable = null;
+            Table srcAclHtable = null;
+            Table destAclHtable = null;
             try {
-                srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
-                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+                srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
 
                 // cube acl
                 Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
@@ -473,7 +474,6 @@ public class CubeMigrationCLI {
                         destAclHtable.put(put);
                     }
                 }
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(srcAclHtable);
                 IOUtils.closeQuietly(destAclHtable);
@@ -504,8 +504,8 @@ public class CubeMigrationCLI {
 
         switch (opt.type) {
         case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            TableName tableName = TableName.valueOf((String) opt.params[0]);
+            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(tableName);
             hbaseAdmin.disableTable(tableName);
             desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
             hbaseAdmin.modifyTable(tableName, desc);
@@ -539,13 +539,12 @@ public class CubeMigrationCLI {
         case COPY_ACL: {
             String cubeId = (String) opt.params[0];
             String modelId = (String) opt.params[1];
-            HTableInterface destAclHtable = null;
+            Table destAclHtable = null;
             try {
-                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
 
                 destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
                 destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(destAclHtable);
             }
@@ -562,7 +561,7 @@ public class CubeMigrationCLI {
         }
     }
 
-    private static void updateMeta(KylinConfig config){
+    private static void updateMeta(KylinConfig config) {
         String[] nodes = config.getRestServers();
         for (String node : nodes) {
             RestClient restClient = new RestClient(node);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
index 8bd4abf..20d0f7d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
@@ -26,10 +26,10 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
@@ -61,7 +61,7 @@ public class CubeMigrationCheckCLI {
     private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube");
 
     private KylinConfig dstCfg;
-    private HBaseAdmin hbaseAdmin;
+    private Admin hbaseAdmin;
 
     private List<String> issueExistHTables;
     private List<String> inconsistentHTables;
@@ -130,9 +130,8 @@ public class CubeMigrationCheckCLI {
         this.dstCfg = kylinConfig;
         this.ifFix = isFix;
 
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        hbaseAdmin = new HBaseAdmin(conf);
-
+        Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+        hbaseAdmin = conn.getAdmin();
         issueExistHTables = Lists.newArrayList();
         inconsistentHTables = Lists.newArrayList();
     }
@@ -189,10 +188,10 @@ public class CubeMigrationCheckCLI {
                 String[] sepNameList = segFullName.split(",");
                 HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
                 logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix());
-                hbaseAdmin.disableTable(sepNameList[0]);
+                hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0]));
                 desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix());
-                hbaseAdmin.modifyTable(sepNameList[0], desc);
-                hbaseAdmin.enableTable(sepNameList[0]);
+                hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc);
+                hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0]));
             }
         } else {
             logger.info("------ Inconsistent HTables Needed To Be Fixed ------");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 8f69c18..8f7430e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -44,7 +44,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
@@ -81,7 +82,8 @@ public class DeployCoprocessorCLI {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
+        Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
 
         String localCoprocessorJar;
         if ("default".equals(args[0])) {
@@ -165,10 +167,10 @@ public class DeployCoprocessorCLI {
     public static void deployCoprocessor(HTableDescriptor tableDesc) {
         try {
             initHTableCoprocessor(tableDesc);
-            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+            logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
 
         } catch (Exception ex) {
-            logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+            logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
             logger.error("Will try creating the table without coprocessor.");
         }
     }
@@ -190,7 +192,7 @@ public class DeployCoprocessorCLI {
         desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
     }
 
-    public static boolean resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+    public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
 
@@ -205,7 +207,7 @@ public class DeployCoprocessorCLI {
         logger.info("reset coprocessor on " + tableName);
 
         logger.info("Disable " + tableName);
-        hbaseAdmin.disableTable(tableName);
+        hbaseAdmin.disableTable(TableName.valueOf(tableName));
 
         while (desc.hasCoprocessor(CubeObserverClass)) {
             desc.removeCoprocessor(CubeObserverClass);
@@ -231,16 +233,15 @@ public class DeployCoprocessorCLI {
             desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
         }
 
-        hbaseAdmin.modifyTable(tableName, desc);
+        hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
 
         logger.info("Enable " + tableName);
-        hbaseAdmin.enableTable(tableName);
+        hbaseAdmin.enableTable(TableName.valueOf(tableName));
 
         return true;
     }
 
-
-    private static List<String> resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+    private static List<String> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
         List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
         ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
         CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
@@ -261,12 +262,12 @@ public class DeployCoprocessorCLI {
 
     private static class ResetCoprocessorWorker implements Runnable {
         private final CountDownLatch countDownLatch;
-        private final HBaseAdmin hbaseAdmin;
+        private final Admin hbaseAdmin;
         private final Path hdfsCoprocessorJar;
         private final String tableName;
         private final List<String> processedTables;
 
-        public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) {
+        public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List<String> processedTables) {
             this.countDownLatch = countDownLatch;
             this.hbaseAdmin = hbaseAdmin;
             this.hdfsCoprocessorJar = hdfsCoprocessorJar;
@@ -387,7 +388,7 @@ public class DeployCoprocessorCLI {
         return coprocessorDir;
     }
 
-    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
+    private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
         HashSet<String> result = new HashSet<String>();
 
         for (String tableName : tableNames) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 61c73d5..1cdb2f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -25,10 +25,11 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -235,9 +236,9 @@ public class ExtendCubeToHybridCLI {
         Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
         ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer);
         String projUUID = project.getUuid();
-        HTableInterface aclHtable = null;
+        Table aclHtable = null;
         try {
-            aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl");
+            aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl"));
 
             // cube acl
             Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId)));
@@ -257,7 +258,6 @@ public class ExtendCubeToHybridCLI {
                     aclHtable.put(put);
                 }
             }
-            aclHtable.flushCommits();
         } finally {
             IOUtils.closeQuietly(aclHtable);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index 86ba22f..dd5f8fa 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
@@ -75,7 +75,7 @@ public class GridTableHBaseBenchmark {
         System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
 
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         createHTableIfNeeded(conn, TEST_TABLE);
         prepareData(conn);
 
@@ -91,10 +91,10 @@ public class GridTableHBaseBenchmark {
 
     }
 
-    private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+    private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
         Stats stats = new Stats("COLUMN_SCAN");
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -122,20 +122,20 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
         fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
     }
 
-    private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
     }
 
-    private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
     }
 
-    private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -156,11 +156,11 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+    private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
 
         final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
 
             stats.markStart();
@@ -204,8 +204,8 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void prepareData(HConnection conn) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void prepareData(Connection conn) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
 
         try {
             // check how many rows existing
@@ -258,8 +258,8 @@ public class GridTableHBaseBenchmark {
         return bytes;
     }
 
-    private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
 
         try {
             boolean tableExist = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
index 6749d6c..940d64a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java
@@ -24,9 +24,11 @@ import java.util.List;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -55,8 +57,8 @@ public class HBaseClean extends AbstractApplication {
     private void cleanUp() {
         try {
             // get all kylin hbase tables
-            Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-            HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+            Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+            Admin hbaseAdmin = conn.getAdmin();
             String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
             HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
             List<String> allTablesNeedToBeDropped = Lists.newArrayList();
@@ -71,12 +73,12 @@ public class HBaseClean extends AbstractApplication {
                 // drop tables
                 for (String htableName : allTablesNeedToBeDropped) {
                     logger.info("Deleting HBase table " + htableName);
-                    if (hbaseAdmin.tableExists(htableName)) {
-                        if (hbaseAdmin.isTableEnabled(htableName)) {
-                            hbaseAdmin.disableTable(htableName);
+                    if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+                        if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) {
+                            hbaseAdmin.disableTable(TableName.valueOf(htableName));
                         }
 
-                        hbaseAdmin.deleteTable(htableName);
+                        hbaseAdmin.deleteTable(TableName.valueOf(htableName));
                         logger.info("Deleted HBase table " + htableName);
                     } else {
                         logger.info("HBase table" + htableName + " does not exist");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
index 937b65f..1daca0a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -31,12 +32,15 @@ import java.util.TreeSet;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
@@ -58,30 +62,31 @@ public class HBaseRegionSizeCalculator {
     /**
      * Computes size of each region for table and given column families.
      * */
-    public HBaseRegionSizeCalculator(HTable table) throws IOException {
-        this(table, new HBaseAdmin(table.getConfiguration()));
-    }
-
-    /** Constructor for unit testing */
-    HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
+    public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException {
 
+        Table table = null;
+        Admin admin = null;
         try {
+            table = hbaseConnection.getTable(TableName.valueOf(tableName));
+            admin = hbaseConnection.getAdmin();
+
             if (!enabled(table.getConfiguration())) {
                 logger.info("Region size calculation disabled.");
                 return;
             }
 
-            logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+            logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
 
             // Get regions for table.
-            Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+            RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
+            List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
             Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 
-            for (HRegionInfo regionInfo : tableRegionInfos) {
-                tableRegions.add(regionInfo.getRegionName());
+            for (HRegionLocation hRegionLocation : regionLocationList) {
+                tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
             }
 
-            ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+            ClusterStatus clusterStatus = admin.getClusterStatus();
             Collection<ServerName> servers = clusterStatus.getServers();
             final long megaByte = 1024L * 1024L;
 
@@ -105,7 +110,7 @@ public class HBaseRegionSizeCalculator {
                 }
             }
         } finally {
-            IOUtils.closeQuietly(hBaseAdmin);
+            IOUtils.closeQuietly(admin);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
index 266f7e7..a2f60d4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java
@@ -23,9 +23,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
@@ -42,8 +43,8 @@ public class HBaseUsage {
         Map<String, List<String>> envs = Maps.newHashMap();
 
         // get all kylin hbase tables
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
         for (HTableDescriptor desc : tableDescriptors) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
index e26c8e8..da13fa4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
@@ -32,15 +32,15 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
@@ -58,11 +58,11 @@ public class HbaseStreamingInput {
     private static final byte[] QN = "C".getBytes();
 
     public static void createTable(String tableName) throws IOException {
-        HConnection conn = getConnection();
-        HBaseAdmin hadmin = new HBaseAdmin(conn);
+        Connection conn = getConnection();
+        Admin hadmin = conn.getAdmin();
 
         try {
-            boolean tableExist = hadmin.tableExists(tableName);
+            boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName));
             if (tableExist) {
                 logger.info("HTable '" + tableName + "' already exists");
                 return;
@@ -119,8 +119,8 @@ public class HbaseStreamingInput {
                 e.printStackTrace();
             }
 
-            HConnection conn = getConnection();
-            HTableInterface table = conn.getTable(tableName);
+            Connection conn = getConnection();
+            Table table = conn.getTable(TableName.valueOf(tableName));
 
             byte[] key = new byte[8 + 4];//time + id
 
@@ -135,7 +135,7 @@ public class HbaseStreamingInput {
                 Bytes.putInt(key, 8, i);
                 Put put = new Put(key);
                 byte[] cell = randomBytes(CELL_SIZE);
-                put.add(CF, QN, cell);
+                put.addColumn(CF, QN, cell);
                 buffer.add(put);
             }
             table.put(buffer);
@@ -170,8 +170,8 @@ public class HbaseStreamingInput {
             }
 
             Random r = new Random();
-            HConnection conn = getConnection();
-            HTableInterface table = conn.getTable(tableName);
+            Connection conn = getConnection();
+            Table table = conn.getTable(TableName.valueOf(tableName));
 
             long leftBound = getFirstKeyTime(table);
             long rightBound = System.currentTimeMillis();
@@ -206,7 +206,7 @@ public class HbaseStreamingInput {
         }
     }
 
-    private static long getFirstKeyTime(HTableInterface table) throws IOException {
+    private static long getFirstKeyTime(Table table) throws IOException {
         long startTime = 0;
 
         Scan scan = new Scan();
@@ -224,8 +224,8 @@ public class HbaseStreamingInput {
 
     }
 
-    private static HConnection getConnection() throws IOException {
-        return HConnectionManager.createConnection(HBaseConnection.getCurrentHBaseConfiguration());
+    private static Connection getConnection() throws IOException {
+        return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
     }
 
     private static String formatTime(long time) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
index ca1a060..ea05ab2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
@@ -23,10 +23,11 @@ import java.io.IOException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -50,8 +51,8 @@ public class HtableAlterMetadataCLI extends AbstractApplication {
     String metadataValue;
 
     private void alter() throws IOException {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
         HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
 
         hbaseAdmin.disableTable(table.getTableName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
index 8ff5b0f..df4e912 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
@@ -30,10 +30,14 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,9 +56,9 @@ public class OrphanHBaseCleanJob extends AbstractApplication {
     Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
 
     private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
-
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
         // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = conn.getAdmin();
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -73,12 +77,13 @@ public class OrphanHBaseCleanJob extends AbstractApplication {
             // drop tables
             for (String htableName : allTablesNeedToBeDropped) {
                 logger.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    if (hbaseAdmin.isTableEnabled(htableName)) {
-                        hbaseAdmin.disableTable(htableName);
+                TableName tableName = TableName.valueOf(htableName);
+                if (hbaseAdmin.tableExists(tableName)) {
+                    if (hbaseAdmin.isTableEnabled(tableName)) {
+                        hbaseAdmin.disableTable(tableName);
                     }
 
-                    hbaseAdmin.deleteTable(htableName);
+                    hbaseAdmin.deleteTable(tableName);
                     logger.info("Deleted HBase table " + htableName);
                 } else {
                     logger.info("HBase table" + htableName + " does not exist");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
index e219c5a..8a93160 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
@@ -22,12 +22,13 @@ import java.io.IOException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,12 +59,12 @@ public class PingHBaseCLI {
         Scan scan = new Scan();
         int limit = 20;
 
-        HConnection conn = null;
-        HTableInterface table = null;
+        Connection conn = null;
+        Table table = null;
         ResultScanner scanner = null;
         try {
-            conn = HConnectionManager.createConnection(hconf);
-            table = conn.getTable(hbaseTable);
+            conn = ConnectionFactory.createConnection(hconf);
+            table = conn.getTable(TableName.valueOf(hbaseTable));
             scanner = table.getScanner(scan);
             int count = 0;
             for (Result r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
index 01edb1f..db516bb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -22,11 +22,12 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -70,8 +71,8 @@ public class RowCounterCLI {
 
         logger.info("My Scan " + scan.toString());
 
-        HConnection conn = HConnectionManager.createConnection(conf);
-        HTableInterface tableInterface = conn.getTable(htableName);
+        Connection conn = ConnectionFactory.createConnection(conf);
+        Table tableInterface = conn.getTable(TableName.valueOf(htableName));
 
         Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
         int counter = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 62af2c9..0784305 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.CliCommandExecutor;
@@ -57,6 +59,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +80,8 @@ public class StorageCleanupJob extends AbstractApplication {
     private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -153,22 +157,22 @@ public class StorageCleanupJob extends AbstractApplication {
     }
 
     class DeleteHTableRunnable implements Callable {
-        HBaseAdmin hbaseAdmin;
+        Admin hbaseAdmin;
         String htableName;
 
-        DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
+        DeleteHTableRunnable(Admin hbaseAdmin, String htableName) {
             this.hbaseAdmin = hbaseAdmin;
             this.htableName = htableName;
         }
 
         public Object call() throws Exception {
             logger.info("Deleting HBase table " + htableName);
-            if (hbaseAdmin.tableExists(htableName)) {
-                if (hbaseAdmin.isTableEnabled(htableName)) {
-                    hbaseAdmin.disableTable(htableName);
+            if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+                if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) {
+                    hbaseAdmin.disableTable(TableName.valueOf(htableName));
                 }
 
-                hbaseAdmin.deleteTable(htableName);
+                hbaseAdmin.deleteTable(TableName.valueOf(htableName));
                 logger.info("Deleted HBase table " + htableName);
             } else {
                 logger.info("HBase table" + htableName + " does not exist");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
index e36f662..42a54c8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java
@@ -24,16 +24,18 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,14 +51,15 @@ public class UpdateHTableHostCLI {
     private List<String> errorMsgs = Lists.newArrayList();
 
     private List<String> htables;
-    private HBaseAdmin hbaseAdmin;
+    private Admin hbaseAdmin;
     private KylinConfig kylinConfig;
     private String oldHostValue;
 
     public UpdateHTableHostCLI(List<String> htables, String oldHostValue) throws IOException {
         this.htables = htables;
         this.oldHostValue = oldHostValue;
-        this.hbaseAdmin = new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration());
+        Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
+        hbaseAdmin = conn.getAdmin();
         this.kylinConfig = KylinConfig.getInstanceFromEnv();
     }
 
@@ -166,9 +169,9 @@ public class UpdateHTableHostCLI {
         HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
         if (oldHostValue.equals(desc.getValue(IRealizationConstants.HTableTag))) {
             desc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-            hbaseAdmin.disableTable(tableName);
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
 
             updatedResources.add(tableName);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
index c25b690..4695353 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.metadata.datatype.LongMutable;
@@ -229,15 +230,8 @@ public class AggregateRegionObserverTest {
             return nextRaw(results);
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
-         * .List, int)
-         */
         @Override
-        public boolean next(List<Cell> result, int limit) throws IOException {
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
@@ -306,6 +300,11 @@ public class AggregateRegionObserverTest {
             return 0;
         }
 
+        @Override
+        public int getBatch() {
+            return 0;
+        }
+
         /*
          * (non-Javadoc)
          * 
@@ -322,16 +321,9 @@ public class AggregateRegionObserverTest {
             return i < input.size();
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
-         * .List, int)
-         */
         @Override
-        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-            return nextRaw(result);
+        public boolean nextRaw(List<Cell> list, ScannerContext scannerContext) throws IOException {
+            return false;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
index 1d85922..04e2e8b 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -136,7 +137,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
 
                         Put p = new Put(rk);
                         p.setDurability(Durability.SKIP_WAL);
-                        p.add(cf.getBytes(), cq, Bytes.toBytes(c));
+                        p.addColumn(cf.getBytes(), cq, Bytes.toBytes(c));
                         ht.put(p);
                     }
                 }
@@ -224,7 +225,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
         scan.addFamily(cf.getBytes());
         scan.setFilter(filter);
         List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table.getBytes());
-        HRegion first = regions.get(0);
+        Region first = regions.get(0);
         first.getScanner(scan);
         RegionScanner scanner = first.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index c8bff89..c0042f3 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -36,9 +36,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RawResource;
@@ -231,6 +231,7 @@ public class CubeMigrationCLI {
             operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
         }
     }
+
     private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException {
         String projectResPath = ProjectInstance.concatResourcePath(projectName);
         if (!dstStore.exists(projectResPath))
@@ -447,11 +448,11 @@ public class CubeMigrationCLI {
             Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
             ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
             String projUUID = project.getUuid();
-            HTableInterface srcAclHtable = null;
-            HTableInterface destAclHtable = null;
+            Table srcAclHtable = null;
+            Table destAclHtable = null;
             try {
-                srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
-                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+                srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
 
                 // cube acl
                 Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
@@ -471,7 +472,6 @@ public class CubeMigrationCLI {
                         destAclHtable.put(put);
                     }
                 }
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(srcAclHtable);
                 IOUtils.closeQuietly(destAclHtable);
@@ -537,13 +537,12 @@ public class CubeMigrationCLI {
         case COPY_ACL: {
             String cubeId = (String) opt.params[0];
             String modelId = (String) opt.params[1];
-            HTableInterface destAclHtable = null;
+            Table destAclHtable = null;
             try {
-                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+                destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
 
                 destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
                 destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(destAclHtable);
             }
@@ -560,7 +559,7 @@ public class CubeMigrationCLI {
         }
     }
 
-    private static void updateMeta(KylinConfig config){
+    private static void updateMeta(KylinConfig config) {
         String[] nodes = config.getRestServers();
         for (String node : nodes) {
             RestClient restClient = new RestClient(node);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index 19e5db0..f52fc3e 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -25,10 +25,11 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -231,9 +232,9 @@ public class ExtendCubeToHybridCLI {
         Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
         ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer);
         String projUUID = project.getUuid();
-        HTableInterface aclHtable = null;
+        Table aclHtable = null;
         try {
-            aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl");
+            aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl"));
 
             // cube acl
             Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId)));
@@ -253,7 +254,6 @@ public class ExtendCubeToHybridCLI {
                     aclHtable.put(put);
                 }
             }
-            aclHtable.flushCommits();
         } finally {
             IOUtils.closeQuietly(aclHtable);
         }


[7/7] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API

Posted by li...@apache.org.
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4853dcae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4853dcae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4853dcae

Branch: refs/heads/yang22-hbase1.x
Commit: 4853dcae3d764c3ef08cb2f76310794e06d1a5e3
Parents: 7564274
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 23 17:07:05 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Feb 16 11:49:16 2017 +0800

----------------------------------------------------------------------
 examples/test_case_data/sandbox/hbase-site.xml  | 19 +---
 .../kylin/provision/BuildCubeWithEngine.java    | 12 +--
 pom.xml                                         | 12 +--
 .../kylin/rest/security/AclHBaseStorage.java    |  4 +-
 .../rest/security/MockAclHBaseStorage.java      |  8 +-
 .../apache/kylin/rest/security/MockHTable.java  | 95 ++++----------------
 .../rest/security/RealAclHBaseStorage.java      |  9 +-
 .../apache/kylin/rest/service/AclService.java   | 25 +++---
 .../apache/kylin/rest/service/CubeService.java  | 35 +++-----
 .../apache/kylin/rest/service/QueryService.java | 24 +++--
 .../apache/kylin/rest/service/UserService.java  | 17 ++--
 .../kylin/storage/hbase/HBaseConnection.java    | 44 ++++-----
 .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++----
 .../kylin/storage/hbase/HBaseStorage.java       |  3 +-
 .../storage/hbase/cube/SimpleHBaseStore.java    | 20 ++---
 .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +--
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  4 +-
 .../hbase/cube/v1/RegionScannerAdapter.java     | 10 ++-
 .../cube/v1/SerializedHBaseTupleIterator.java   |  4 +-
 .../observer/AggregateRegionObserver.java       |  4 +-
 .../observer/AggregationScanner.java            | 14 ++-
 .../observer/ObserverAggregationCache.java      | 10 ++-
 .../coprocessor/observer/ObserverEnabler.java   |  4 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 13 +--
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |  9 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  4 +-
 .../storage/hbase/steps/CubeHTableUtil.java     | 16 ++--
 .../storage/hbase/steps/DeprecatedGCStep.java   | 26 +++---
 .../storage/hbase/steps/HBaseCuboidWriter.java  |  7 +-
 .../kylin/storage/hbase/steps/MergeGCStep.java  | 23 ++---
 .../storage/hbase/util/CleanHtableCLI.java      | 12 +--
 .../storage/hbase/util/CubeMigrationCLI.java    | 37 ++++----
 .../hbase/util/CubeMigrationCheckCLI.java       | 17 ++--
 .../hbase/util/DeployCoprocessorCLI.java        | 27 +++---
 .../hbase/util/ExtendCubeToHybridCLI.java       |  8 +-
 .../hbase/util/GridTableHBaseBenchmark.java     | 34 +++----
 .../kylin/storage/hbase/util/HBaseClean.java    | 18 ++--
 .../hbase/util/HBaseRegionSizeCalculator.java   | 35 ++++----
 .../kylin/storage/hbase/util/HBaseUsage.java    |  9 +-
 .../storage/hbase/util/HbaseStreamingInput.java | 30 +++----
 .../hbase/util/HtableAlterMetadataCLI.java      |  9 +-
 .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++--
 .../kylin/storage/hbase/util/PingHBaseCLI.java  | 15 ++--
 .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +--
 .../storage/hbase/util/StorageCleanupJob.java   | 20 +++--
 .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++--
 .../observer/AggregateRegionObserverTest.java   | 26 ++----
 .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java |  5 +-
 .../org/apache/kylin/tool/CubeMigrationCLI.java | 19 ++--
 .../kylin/tool/ExtendCubeToHybridCLI.java       |  8 +-
 50 files changed, 418 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/examples/test_case_data/sandbox/hbase-site.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml
index 46d5345..734908e 100644
--- a/examples/test_case_data/sandbox/hbase-site.xml
+++ b/examples/test_case_data/sandbox/hbase-site.xml
@@ -190,22 +190,5 @@
         <name>zookeeper.znode.parent</name>
         <value>/hbase-unsecure</value>
     </property>
-    <property>
-        <name>hbase.client.pause</name>
-        <value>100</value>
-        <description>General client pause value.  Used mostly as value to wait
-            before running a retry of a failed get, region lookup, etc.
-            See hbase.client.retries.number for description of how we backoff from
-            this initial pause amount and how this pause works w/ retries.</description>
-    </property>
-    <property>
-        <name>hbase.client.retries.number</name>
-        <value>5</value>
-        <description>Maximum retries.  Used as maximum for all retryable
-            operations such as the getting of a cell's value, starting a row update,
-            etc.  Retry interval is a rough function based on hbase.client.pause.  At
-            first we retry at this interval but then with backoff, we pretty quickly reach
-            retrying every ten seconds.  See HConstants#RETRY_BACKOFF for how the backup
-            ramps up.  Change this setting and hbase.client.pause to suit your workload.</description>
-    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 7007039..ba9511c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -35,8 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -58,6 +57,7 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.apache.kylin.tool.StorageCleanupJob;
@@ -430,10 +430,10 @@ public class BuildCubeWithEngine {
 
     @SuppressWarnings("unused")
     private void checkHFilesInHBase(CubeSegment segment) throws IOException {
-        Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
-        String tableName = segment.getStorageLocationIdentifier();
-        try (HTable table = new HTable(conf, tableName)) {
-            HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+        try (Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl())) {
+            String tableName = segment.getStorageLocationIdentifier();
+
+            HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
             Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
             long totalSize = 0;
             for (Long size : sizeMap.values()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8b2e0aa..dc23a2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,20 +46,20 @@
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
         <!-- Hadoop versions -->
-        <hadoop2.version>2.6.0</hadoop2.version>
-        <yarn.version>2.6.0</yarn.version>
+        <hadoop2.version>2.7.1</hadoop2.version>
+        <yarn.version>2.7.1</yarn.version>
 
         <!-- Hive versions -->
-        <hive.version>0.14.0</hive.version>
-        <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
+        <hive.version>1.2.1</hive.version>
+        <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
 
         <!-- HBase versions -->
-        <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+        <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
         <kafka.version>0.10.0.0</kafka.version>
 
         <!-- Hadoop deps, keep compatible with hadoop2.version -->
         <zookeeper.version>3.4.6</zookeeper.version>
-        <curator.version>2.6.0</curator.version>
+        <curator.version>2.7.1</curator.version>
         <jackson.version>2.2.4</jackson.version>
         <jsr305.version>3.0.1</jsr305.version>
         <guava.version>14.0</guava.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
index ea68855..8095bf8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
@@ -20,7 +20,7 @@ package org.apache.kylin.rest.security;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
 
 /**
  */
@@ -36,6 +36,6 @@ public interface AclHBaseStorage {
 
     String prepareHBaseTable(Class<?> clazz) throws IOException;
 
-    HTableInterface getTable(String tableName) throws IOException;
+    Table getTable(String tableName) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index d9326f5..cc76b87 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -21,7 +21,7 @@ package org.apache.kylin.rest.security;
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.rest.service.AclService;
 import org.apache.kylin.rest.service.QueryService;
@@ -34,8 +34,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
     private static final String aclTableName = "MOCK-ACL-TABLE";
     private static final String userTableName = "MOCK-USER-TABLE";
 
-    private HTableInterface mockedAclTable;
-    private HTableInterface mockedUserTable;
+    private Table mockedAclTable;
+    private Table mockedUserTable;
     private RealAclHBaseStorage realAcl;
 
     public MockAclHBaseStorage() {
@@ -65,7 +65,7 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
     }
 
     @Override
-    public HTableInterface getTable(String tableName) throws IOException {
+    public Table getTable(String tableName) throws IOException {
         if (realAcl != null) {
             return realAcl.getTable(tableName);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
index d0aa0ed..972eea9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -91,7 +91,7 @@ import com.google.protobuf.ServiceException;
  *     <li>remove some methods for loading data, checking values ...</li>
  * </ul>
  */
-public class MockHTable implements HTableInterface {
+public class MockHTable implements Table {
     private final String tableName;
     private final List<String> columnFamilies = new ArrayList<>();
 
@@ -114,14 +114,6 @@ public class MockHTable implements HTableInterface {
         this.columnFamilies.add(columnFamily);
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public byte[] getTableName() {
-        return tableName.getBytes();
-    }
-
     @Override
     public TableName getName() {
         return null;
@@ -200,8 +192,8 @@ public class MockHTable implements HTableInterface {
     }
 
     @Override
-    public Boolean[] exists(List<Get> gets) throws IOException {
-        return new Boolean[0];
+    public boolean[] existsAll(List<Get> list) throws IOException {
+        return new boolean[0];
     }
 
     /**
@@ -306,15 +298,6 @@ public class MockHTable implements HTableInterface {
      * {@inheritDoc}
      */
     @Override
-    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-        // FIXME: implement
-        return null;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
     public ResultScanner getScanner(Scan scan) throws IOException {
         final List<Result> ret = new ArrayList<Result>();
         byte[] st = scan.getStartRow();
@@ -446,7 +429,7 @@ public class MockHTable implements HTableInterface {
              */
         }
         if (filter.hasFilterRow() && !filteredOnRowKey) {
-            filter.filterRow(nkvs);
+            filter.filterRow();
         }
         if (filter.filterRow() || filteredOnRowKey) {
             nkvs.clear();
@@ -535,6 +518,11 @@ public class MockHTable implements HTableInterface {
         return false;
     }
 
+    @Override
+    public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
+        return false;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -555,7 +543,7 @@ public class MockHTable implements HTableInterface {
                 continue;
             }
             for (KeyValue kv : delete.getFamilyMap().get(family)) {
-                if (kv.isDeleteFamily()) {
+                if (kv.isDelete()) {
                     data.get(row).get(kv.getFamily()).clear();
                 } else {
                     data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
@@ -592,6 +580,11 @@ public class MockHTable implements HTableInterface {
         return false;
     }
 
+    @Override
+    public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
+        return false;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -605,7 +598,7 @@ public class MockHTable implements HTableInterface {
      */
     @Override
     public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
-        return incrementColumnValue(row, family, qualifier, amount, true);
+        return incrementColumnValue(row, family, qualifier, amount, null);
     }
 
     @Override
@@ -617,37 +610,6 @@ public class MockHTable implements HTableInterface {
      * {@inheritDoc}
      */
     @Override
-    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
-        if (check(row, family, qualifier, null)) {
-            Put put = new Put(row);
-            put.add(family, qualifier, Bytes.toBytes(amount));
-            put(put);
-            return amount;
-        }
-        long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
-        data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
-        return newValue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isAutoFlush() {
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void flushCommits() throws IOException {
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
     public void close() throws IOException {
     }
 
@@ -673,29 +635,6 @@ public class MockHTable implements HTableInterface {
      * {@inheritDoc}
      */
     @Override
-    public void setAutoFlush(boolean autoFlush) {
-        throw new NotImplementedException();
-
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean autoFlush) {
-        throw new NotImplementedException();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
     public long getWriteBufferSize() {
         throw new NotImplementedException();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
index 1d520c4..d1a1384 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
@@ -21,7 +21,8 @@ package org.apache.kylin.rest.security;
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.rest.service.AclService;
 import org.apache.kylin.rest.service.QueryService;
@@ -58,11 +59,11 @@ public class RealAclHBaseStorage implements AclHBaseStorage {
     }
 
     @Override
-    public HTableInterface getTable(String tableName) throws IOException {
+    public Table getTable(String tableName) throws IOException {
         if (StringUtils.equals(tableName, aclTableName)) {
-            return HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
         } else if (StringUtils.equals(tableName, userTableName)) {
-            return HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
         } else {
             throw new IllegalStateException("getTable failed" + tableName);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
index d693a67..3e3efec 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -33,7 +33,7 @@ import javax.annotation.PostConstruct;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -124,7 +124,7 @@ public class AclService implements MutableAclService {
     @Override
     public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
         List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(aclTableName);
 
@@ -173,7 +173,7 @@ public class AclService implements MutableAclService {
     @Override
     public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
         Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
-        HTableInterface htable = null;
+        Table htable = null;
         Result result = null;
         try {
             htable = aclHBaseStorage.getTable(aclTableName);
@@ -226,17 +226,16 @@ public class AclService implements MutableAclService {
         Authentication auth = SecurityContextHolder.getContext().getAuthentication();
         PrincipalSid sid = new PrincipalSid(auth);
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(aclTableName);
 
             Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
-            put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
-            put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
-            put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
 
             htable.put(put);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " created successfully.");
         } catch (IOException e) {
@@ -250,7 +249,7 @@ public class AclService implements MutableAclService {
 
     @Override
     public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(aclTableName);
 
@@ -266,7 +265,6 @@ public class AclService implements MutableAclService {
             }
 
             htable.delete(delete);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " deleted successfully.");
         } catch (IOException e) {
@@ -284,7 +282,7 @@ public class AclService implements MutableAclService {
             throw e;
         }
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(aclTableName);
 
@@ -295,17 +293,16 @@ public class AclService implements MutableAclService {
             Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
 
             if (null != acl.getParentAcl()) {
-                put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+                put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
             }
 
             for (AccessControlEntry ace : acl.getEntries()) {
                 AceInfo aceInfo = new AceInfo(ace);
-                put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+                put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
             }
 
             if (!put.isEmpty()) {
                 htable.put(put);
-                htable.flushCommits();
 
                 logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4657cd2..9115dcd 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -28,9 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -406,33 +404,24 @@ public class CubeService extends BasicService {
         if (htableInfoCache.containsKey(tableName)) {
             return htableInfoCache.get(tableName);
         }
-
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-        HTable table = null;
+        Connection conn = HBaseConnection.get(this.getConfig().getStorageUrl());
         HBaseResponse hr = null;
         long tableSize = 0;
         int regionCount = 0;
 
-        try {
-            table = new HTable(hconf, tableName);
-
-            HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
-            Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+        Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
 
-            for (long s : sizeMap.values()) {
-                tableSize += s;
-            }
-
-            regionCount = sizeMap.size();
-
-            // Set response.
-            hr = new HBaseResponse();
-            hr.setTableSize(tableSize);
-            hr.setRegionCount(regionCount);
-        } finally {
-            IOUtils.closeQuietly(table);
+        for (long s : sizeMap.values()) {
+            tableSize += s;
         }
 
+        regionCount = sizeMap.size();
+
+        // Set response.
+        hr = new HBaseResponse();
+        hr.setTableSize(tableSize);
+        hr.setRegionCount(regionCount);
         htableInfoCache.put(tableName, hr);
 
         return hr;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index bc644cc..b07e698 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -49,11 +49,11 @@ import javax.sql.DataSource;
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.debug.BackdoorToggles;
@@ -164,14 +164,13 @@ public class QueryService extends BasicService {
         Query[] queryArray = new Query[queries.size()];
 
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -197,14 +196,13 @@ public class QueryService extends BasicService {
 
         Query[] queryArray = new Query[queries.size()];
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -216,12 +214,12 @@ public class QueryService extends BasicService {
         }
 
         List<Query> queries = new ArrayList<Query>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            HConnection conn = HBaseConnection.get(hbaseUrl);
+            org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl);
             HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
 
-            htable = conn.getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Get get = new Get(Bytes.toBytes(creator));
             get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
             Result result = htable.get(get);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
index 07c7c6f..ab54882 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -30,11 +30,11 @@ import javax.annotation.PostConstruct;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.rest.security.AclHBaseStorage;
@@ -72,7 +72,7 @@ public class UserService implements UserDetailsManager {
 
     @Override
     public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(userTableName);
 
@@ -144,16 +144,16 @@ public class UserService implements UserDetailsManager {
 
     @Override
     public void updateUser(UserDetails user) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(userTableName);
 
             Pair<byte[], byte[]> pair = userToHBaseRow(user);
             Put put = new Put(pair.getKey());
-            put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
+
+            put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
 
             htable.put(put);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -163,14 +163,13 @@ public class UserService implements UserDetailsManager {
 
     @Override
     public void deleteUser(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(userTableName);
 
             Delete delete = new Delete(Bytes.toBytes(username));
 
             htable.delete(delete);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -185,7 +184,7 @@ public class UserService implements UserDetailsManager {
 
     @Override
     public boolean userExists(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             htable = aclHBaseStorage.getTable(userTableName);
 
@@ -216,7 +215,7 @@ public class UserService implements UserDetailsManager {
         s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
 
         List<UserDetails> all = new ArrayList<UserDetails>();
-        HTableInterface htable = null;
+        Table htable = null;
         ResultScanner scanner = null;
         try {
             htable = aclHBaseStorage.getTable(userTableName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 335bfe7..53c95cb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.kylin.common.KylinConfig;
@@ -64,7 +64,7 @@ public class HBaseConnection {
     private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
 
     private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>();
-    private static final Map<String, HConnection> connPool = new ConcurrentHashMap<String, HConnection>();
+    private static final Map<String, Connection> connPool = new ConcurrentHashMap<String, Connection>();
     private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>();
 
     private static ExecutorService coprocessorPool = null;
@@ -75,7 +75,7 @@ public class HBaseConnection {
             public void run() {
                 closeCoprocessorPool();
 
-                for (HConnection conn : connPool.values()) {
+                for (Connection conn : connPool.values()) {
                     try {
                         conn.close();
                     } catch (IOException e) {
@@ -144,7 +144,7 @@ public class HBaseConnection {
         // using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath
         if (!(StringUtils.isEmpty(url) || "hbase".equals(url)))
             throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
-        
+
         Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
         addHBaseClusterNNHAConfiguration(conf);
 
@@ -213,9 +213,9 @@ public class HBaseConnection {
 
     // ============================================================================
 
-    // returned HConnection can be shared by multiple threads and does not require close()
+    // returned Connection can be shared by multiple threads and does not require close()
     @SuppressWarnings("resource")
-    public static HConnection get(String url) {
+    public static Connection get(String url) {
         // find configuration
         Configuration conf = configCache.get(url);
         if (conf == null) {
@@ -223,13 +223,13 @@ public class HBaseConnection {
             configCache.put(url, conf);
         }
 
-        HConnection connection = connPool.get(url);
+        Connection connection = connPool.get(url);
         try {
             while (true) {
                 // I don't use DCL since recreate a connection is not a big issue.
                 if (connection == null || connection.isClosed()) {
                     logger.info("connection is null or closed, creating a new one");
-                    connection = HConnectionManager.createConnection(conf);
+                    connection = ConnectionFactory.createConnection(conf);
                     connPool.put(url, connection);
                 }
 
@@ -248,8 +248,8 @@ public class HBaseConnection {
         return connection;
     }
 
-    public static boolean tableExists(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    public static boolean tableExists(Connection conn, String tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
         try {
             return hbase.tableExists(TableName.valueOf(tableName));
         } finally {
@@ -269,18 +269,18 @@ public class HBaseConnection {
         deleteTable(HBaseConnection.get(hbaseUrl), tableName);
     }
 
-    public static void createHTableIfNeeded(HConnection conn, String table, String... families) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
-
+    public static void createHTableIfNeeded(Connection conn, String table, String... families) throws IOException {
+        Admin hbase = conn.getAdmin();
+        TableName tableName = TableName.valueOf(table);
         try {
             if (tableExists(conn, table)) {
                 logger.debug("HTable '" + table + "' already exists");
-                Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(TableName.valueOf(table)));
+                Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(tableName));
                 boolean wait = false;
                 for (String family : families) {
                     if (existingFamilies.contains(family) == false) {
                         logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
-                        hbase.addColumn(table, newFamilyDescriptor(family));
+                        hbase.addColumn(tableName, newFamilyDescriptor(family));
                         // addColumn() is async, is there a way to wait it finish?
                         wait = true;
                     }
@@ -333,8 +333,8 @@ public class HBaseConnection {
         return fd;
     }
 
-    public static void deleteTable(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    public static void deleteTable(Connection conn, String tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
 
         try {
             if (!tableExists(conn, tableName)) {
@@ -344,10 +344,10 @@ public class HBaseConnection {
 
             logger.debug("delete HTable '" + tableName + "'");
 
-            if (hbase.isTableEnabled(tableName)) {
-                hbase.disableTable(tableName);
+            if (hbase.isTableEnabled(TableName.valueOf(tableName))) {
+                hbase.disableTable(TableName.valueOf(tableName));
             }
-            hbase.deleteTable(tableName);
+            hbase.deleteTable(TableName.valueOf(tableName));
 
             logger.debug("HTable '" + tableName + "' deleted");
         } finally {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6217350..1c45967 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -30,14 +30,15 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -69,7 +70,7 @@ public class HBaseResourceStore extends ResourceStore {
     final String tableNameBase;
     final String hbaseUrl;
 
-    HConnection getConnection() throws IOException {
+    Connection getConnection() throws IOException {
         return HBaseConnection.get(hbaseUrl);
     }
 
@@ -120,7 +121,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] endRow = Bytes.toBytes(lookForPrefix);
         endRow[endRow.length - 1]++;
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         Scan scan = new Scan(startRow, endRow);
         if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
             scan.addColumn(B_FAMILY, B_COLUMN_TS);
@@ -237,13 +238,12 @@ public class HBaseResourceStore extends ResourceStore {
         IOUtils.copy(content, bout);
         bout.close();
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
 
             table.put(put);
-            table.flushCommits();
         } finally {
             IOUtils.closeQuietly(table);
         }
@@ -251,7 +251,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -264,8 +264,6 @@ public class HBaseResourceStore extends ResourceStore {
                 throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
             }
 
-            table.flushCommits();
-
             return newTS;
         } finally {
             IOUtils.closeQuietly(table);
@@ -274,7 +272,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             boolean hdfsResourceExist = false;
             Result result = internalGetFromHTable(table, resPath, true, false);
@@ -287,7 +285,6 @@ public class HBaseResourceStore extends ResourceStore {
 
             Delete del = new Delete(Bytes.toBytes(resPath));
             table.delete(del);
-            table.flushCommits();
 
             if (hdfsResourceExist) { // remove hdfs cell value
                 Path redirectPath = bigCellHDFSPath(resPath);
@@ -308,7 +305,7 @@ public class HBaseResourceStore extends ResourceStore {
     }
 
     private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             return internalGetFromHTable(table, path, fetchContent, fetchTimestamp);
         } finally {
@@ -316,7 +313,7 @@ public class HBaseResourceStore extends ResourceStore {
         }
     }
 
-    private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
+    private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
         byte[] rowkey = Bytes.toBytes(path);
 
         Get get = new Get(rowkey);
@@ -335,7 +332,7 @@ public class HBaseResourceStore extends ResourceStore {
         return exists ? result : null;
     }
 
-    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
         FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
 
@@ -360,7 +357,7 @@ public class HBaseResourceStore extends ResourceStore {
         return redirectPath;
     }
 
-    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
         int kvSizeLimit = Integer.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
         if (content.length > kvSizeLimit) {
             writeLargeCellToHdfs(resPath, content, table);
@@ -368,8 +365,8 @@ public class HBaseResourceStore extends ResourceStore {
         }
 
         Put put = new Put(row);
-        put.add(B_FAMILY, B_COLUMN, content);
-        put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+        put.addColumn(B_FAMILY, B_COLUMN, content);
+        put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
 
         return put;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 43b65cb..d36d722 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.storage.hbase;
 
-import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
@@ -36,6 +35,8 @@ import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
 
+import com.google.common.base.Preconditions;
+
 @SuppressWarnings("unused")
 //used by reflection
 public class HBaseStorage implements IStorage {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
index b141190..f63d9c2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
@@ -26,12 +26,13 @@ import java.util.NoSuchElementException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -86,14 +87,13 @@ public class SimpleHBaseStore implements IGTStore {
     }
 
     private class Writer implements IGTWriter {
-        final HTableInterface table;
+        final BufferedMutator table;
         final ByteBuffer rowkey = ByteBuffer.allocate(50);
         final ByteBuffer value = ByteBuffer.allocate(50);
 
         Writer() throws IOException {
-            HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
-            table = conn.getTable(htableName);
-            table.setAutoFlush(false, true);
+            Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+            table = conn.getBufferedMutator(htableName);
         }
 
         @Override
@@ -113,24 +113,24 @@ public class SimpleHBaseStore implements IGTStore {
 
             Put put = new Put(rowkey);
             put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value);
-            table.put(put);
+            table.mutate(put);
         }
 
         @Override
         public void close() throws IOException {
-            table.flushCommits();
+            table.flush();
             table.close();
         }
     }
 
     class Reader implements IGTScanner {
-        final HTableInterface table;
+        final Table table;
         final ResultScanner scanner;
 
         int count = 0;
 
         Reader() throws IOException {
-            HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+            Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
             table = conn.getTable(htableName);
 
             Scan scan = new Scan();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 8ac3832..982a044 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -25,11 +25,12 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
@@ -70,7 +71,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     protected final List<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
-    private final HTableInterface table;
+    private final Table table;
 
     protected CubeTupleConverter tupleConverter;
     protected final Iterator<HBaseKeyRange> rangeIterator;
@@ -88,7 +89,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private int advMeasureRowsRemaining;
     private int advMeasureRowIndex;
 
-    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
             List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
@@ -108,7 +109,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.rangeIterator = keyRanges.iterator();
 
         try {
-            this.table = conn.getTable(tableName);
+            this.table = conn.getTable(TableName.valueOf(tableName));
         } catch (Throwable t) {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 1b08880..13c9c47 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -33,7 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
@@ -156,7 +156,7 @@ public class CubeStorageQuery implements IStorageQuery {
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
+        Connection conn = HBaseConnection.get(context.getConnUrl());
 
         // notice we're passing filterD down to storage instead of flatFilter
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
index 8a20c65..3d30767 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
 /**
  * @author yangli9
@@ -50,7 +51,7 @@ public class RegionScannerAdapter implements RegionScanner {
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
+    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
         return next(result);
     }
 
@@ -60,7 +61,7 @@ public class RegionScannerAdapter implements RegionScanner {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
         return next(result);
     }
 
@@ -94,4 +95,9 @@ public class RegionScannerAdapter implements RegionScanner {
         return Long.MAX_VALUE;
     }
 
+    @Override
+    public int getBatch() {
+        return -1;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index c4f7367..0f2e2fb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -57,7 +57,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private int scanCount;
     private ITuple next;
 
-    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
+    public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
             StorageContext context, TupleInfo returnTupleInfo) {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
index 7139ca7..7e25e4c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -99,7 +99,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         // start/end region operation & sync on scanner is suggested by the
         // javadoc of RegionScanner.nextRaw()
         // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
-        HRegion region = ctxt.getEnvironment().getRegion();
+        Region region = ctxt.getEnvironment().getRegion();
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index a900ea1..d64f48f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
@@ -116,8 +117,8 @@ public class AggregationScanner implements RegionScanner {
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        return outerScanner.next(result, limit);
+    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+        return outerScanner.next(result, scannerContext);
     }
 
     @Override
@@ -126,8 +127,8 @@ public class AggregationScanner implements RegionScanner {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return outerScanner.nextRaw(result, limit);
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+        return outerScanner.nextRaw(result, scannerContext);
     }
 
     @Override
@@ -160,6 +161,11 @@ public class AggregationScanner implements RegionScanner {
         return outerScanner.getMvccReadPoint();
     }
 
+    @Override
+    public int getBatch() {
+        return outerScanner.getBatch();
+    }
+
     private static class Stats {
         long inputRows = 0;
         long inputBytes = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
index 8404262..331e34d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
@@ -112,7 +113,7 @@ public class ObserverAggregationCache extends AggregationCache {
         }
 
         @Override
-        public boolean next(List<Cell> result, int limit) throws IOException {
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
@@ -122,7 +123,7 @@ public class ObserverAggregationCache extends AggregationCache {
         }
 
         @Override
-        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+        public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
@@ -161,6 +162,11 @@ public class ObserverAggregationCache extends AggregationCache {
             // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()");
             return Long.MAX_VALUE;
         }
+
+        @Override
+        public int getBatch() {
+            return innerScanner.getBatch();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index 394b3e2..9fd33f5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
@@ -60,7 +60,7 @@ public class ObserverEnabler {
     static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
 
     public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
-            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
 
         if (context.isCoprocessorEnabled() == false) {
             return table.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index df1817e..cad5a3f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -26,8 +26,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
 
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -51,10 +52,10 @@ import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitService;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,7 +118,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
 
         // globally shared connection, does not require close
-        final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+        final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
 
         final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
         List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
@@ -172,7 +173,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     final boolean[] abnormalFinish = new boolean[1];
 
                     try {
-                        HTableInterface table = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
+                        Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
 
                         final CubeVisitRequest request = builder.build();
                         final byte[] startKey = epRange.getFirst();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 3cefc5f..a52af90 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.ShardingHash;
@@ -154,8 +155,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         // primary key (also the 0th column block) is always selected
         final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
         // globally shared connection, does not require close
-        HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-        final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+        Connection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+        final Table hbaseTable = hbaseConn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()));
 
         List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
         List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index de53d0d..85812f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -142,7 +142,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         if (shardLength == 0) {
             return;
         }
-        byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey();
+        byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey();
         Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
         Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
     }
@@ -179,7 +179,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
             this.serviceStartTime = System.currentTimeMillis();
 
-            region = env.getRegion();
+            region = (HRegion)env.getRegion();
             region.startRegionOperation();
 
             // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env.

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 2814ad6..feb4842 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -79,7 +80,8 @@ public class CubeHTableUtil {
         tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
+        Admin admin = conn.getAdmin();
 
         try {
             if (User.isHBaseSecurityEnabled(conf)) {
@@ -92,7 +94,7 @@ public class CubeHTableUtil {
                 tableDesc.addFamily(cf);
             }
 
-            if (admin.tableExists(tableName)) {
+            if (admin.tableExists(TableName.valueOf(tableName))) {
                 // admin.disableTable(tableName);
                 // admin.deleteTable(tableName);
                 throw new RuntimeException("HBase table " + tableName + " exists!");
@@ -101,7 +103,7 @@ public class CubeHTableUtil {
             DeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
             admin.createTable(tableDesc, splitKeys);
-            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
+            Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } finally {
             IOUtils.closeQuietly(admin);
@@ -110,8 +112,7 @@ public class CubeHTableUtil {
     }
 
     public static void deleteHTable(TableName tableName) throws IOException {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin();
         try {
             if (admin.tableExists(tableName)) {
                 logger.info("disabling hbase table " + tableName);
@@ -126,8 +127,7 @@ public class CubeHTableUtil {
 
     /** create a HTable that has the same performance settings as normal cube table, for benchmark purpose */
     public static void createBenchmarkHTable(TableName tableName, String cfName) throws IOException {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin();
         try {
             if (admin.tableExists(tableName)) {
                 logger.info("disabling hbase table " + tableName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index eacff9f..d029b7a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -25,13 +25,13 @@ import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -100,19 +100,21 @@ public class DeprecatedGCStep extends AbstractExecutable {
         List<String> oldTables = getOldHTables();
         if (oldTables != null && oldTables.size() > 0) {
             String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-            HBaseAdmin admin = null;
+            Admin admin = null;
             try {
-                admin = new HBaseAdmin(conf);
+
+                Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+                admin = conn.getAdmin();
+
                 for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                    if (admin.tableExists(TableName.valueOf(table))) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
                         String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
                         if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
+                            if (admin.isTableEnabled(TableName.valueOf(table))) {
+                                admin.disableTable(TableName.valueOf(table));
                             }
-                            admin.deleteTable(table);
+                            admin.deleteTable(TableName.valueOf(table));
                             logger.debug("Dropped HBase table " + table);
                             output.append("Dropped HBase table " + table + " \n");
                         } else {
@@ -193,4 +195,4 @@ public class DeprecatedGCStep extends AbstractExecutable {
         return getParam(OLD_HIVE_TABLE);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4853dcae/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index d5b36df..6587d4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -49,7 +49,7 @@ public class HBaseCuboidWriter implements ICuboidWriter {
 
     private final List<KeyValueCreator> keyValueCreators;
     private final int nColumns;
-    private final HTableInterface hTable;
+    private final Table hTable;
     private final CubeDesc cubeDesc;
     private final CubeSegment cubeSegment;
     private final Object[] measureValues;
@@ -58,7 +58,7 @@ public class HBaseCuboidWriter implements ICuboidWriter {
     private AbstractRowKeyEncoder rowKeyEncoder;
     private byte[] keybuf;
 
-    public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) {
+    public HBaseCuboidWriter(CubeSegment segment, Table hTable) {
         this.keyValueCreators = Lists.newArrayList();
         this.cubeSegment = segment;
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -117,7 +117,6 @@ public class HBaseCuboidWriter implements ICuboidWriter {
             long t = System.currentTimeMillis();
             if (hTable != null) {
                 hTable.put(puts);
-                hTable.flushCommits();
             }
             logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
             puts.clear();


[3/7] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and Avatica 1.9

Posted by li...@apache.org.
KYLIN-2394 Upgrade Calcite to 1.11 and Avatica 1.9


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4b413a29
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4b413a29
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4b413a29

Branch: refs/heads/yang22-hbase1.x
Commit: 4b413a29ef3d5373485426d3ea2e00ee5e38b97c
Parents: 5ef2480
Author: Billy Liu <bi...@apache.org>
Authored: Wed Feb 15 22:45:04 2017 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Feb 15 22:45:04 2017 +0800

----------------------------------------------------------------------
 atopcalcite/pom.xml                             |   11 +-
 .../calcite/sql2rel/SqlToRelConverter.java      | 3212 ++++++++++++------
 jdbc/pom.xml                                    |   10 +-
 .../java/org/apache/kylin/jdbc/KylinMeta.java   |    2 +-
 kylin-it/pom.xml                                |   22 +-
 .../org/apache/kylin/jdbc/ITJDBCDriverTest.java |    1 +
 pom.xml                                         |    6 +-
 query/pom.xml                                   |   10 +-
 8 files changed, 2127 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/atopcalcite/pom.xml
----------------------------------------------------------------------
diff --git a/atopcalcite/pom.xml b/atopcalcite/pom.xml
index c1dba66..3ad6705 100644
--- a/atopcalcite/pom.xml
+++ b/atopcalcite/pom.xml
@@ -38,7 +38,16 @@
         <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.calcite.avatica</groupId>
+            <artifactId>avatica</artifactId>
         </dependency>
     </dependencies>
-
 </project>


[2/7] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and Avatica 1.9

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/4b413a29/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index ae8194c..c70cd20 100644
--- a/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -67,7 +67,6 @@ import org.apache.calcite.rel.stream.Delta;
 import org.apache.calcite.rel.stream.LogicalDelta;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
@@ -98,6 +97,7 @@ import org.apache.calcite.sql.SqlCallBinding;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlDelete;
 import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -163,12 +163,12 @@ import org.apache.calcite.util.trace.CalciteTrace;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 
@@ -182,6 +182,7 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -191,6 +192,7 @@ import static org.apache.calcite.sql.SqlUtil.stripAs;
 import static org.apache.calcite.util.Static.RESOURCE;
 
 /*
+ * The code has synced with calcite. Hope one day, we could remove the hardcode override point.
  * OVERRIDE POINT:
  * - getInSubqueryThreshold(), was `20`, now `Integer.MAX_VALUE`
  * - isTrimUnusedFields(), override to false
@@ -209,13 +211,18 @@ import static org.apache.calcite.util.Static.RESOURCE;
 public class SqlToRelConverter {
     //~ Static fields/initializers ---------------------------------------------
 
-    protected static final Logger SQL2REL_LOGGER = CalciteTrace.getSqlToRelTracer();
+    protected static final Logger SQL2REL_LOGGER =
+            CalciteTrace.getSqlToRelTracer();
 
     private static final BigDecimal TWO = BigDecimal.valueOf(2L);
 
     /** Size of the smallest IN list that will be converted to a semijoin to a
      * static table. */
-    public static final int IN_SUBQUERY_THRESHOLD = 20;
+    public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = 20;
+
+    @Deprecated // to be removed before 2.0
+    public static final int DEFAULT_IN_SUBQUERY_THRESHOLD =
+            DEFAULT_IN_SUB_QUERY_THRESHOLD;
 
     //~ Instance fields --------------------------------------------------------
 
@@ -224,23 +231,20 @@ public class SqlToRelConverter {
     protected final Prepare.CatalogReader catalogReader;
     protected final RelOptCluster cluster;
     private DefaultValueFactory defaultValueFactory;
-    private SubqueryConverter subqueryConverter;
+    private SubQueryConverter subQueryConverter;
     protected final List<RelNode> leaves = new ArrayList<>();
     private final List<SqlDynamicParam> dynamicParamSqlNodes = new ArrayList<>();
     private final SqlOperatorTable opTab;
-    private boolean shouldConvertTableAccess;
     protected final RelDataTypeFactory typeFactory;
     private final SqlNodeToRexConverter exprConverter;
-    private boolean decorrelationEnabled;
-    private boolean trimUnusedFields;
-    private boolean shouldCreateValuesRel;
-    private boolean isExplain;
-    private int nDynamicParamsInExplain;
+    private int explainParamCount;
+    public final SqlToRelConverter.Config config;
 
     /**
-     * Fields used in name resolution for correlated subqueries.
+     * Fields used in name resolution for correlated sub-queries.
      */
-    private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred = new HashMap<>();
+    private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred =
+            new HashMap<>();
 
     /**
      * Stack of names of datasets requested by the <code>
@@ -249,18 +253,15 @@ public class SqlToRelConverter {
     private final Deque<String> datasetStack = new ArrayDeque<>();
 
     /**
-     * Mapping of non-correlated subqueries that have been converted to their
-     * equivalent constants. Used to avoid re-evaluating the subquery if it's
+     * Mapping of non-correlated sub-queries that have been converted to their
+     * equivalent constants. Used to avoid re-evaluating the sub-query if it's
      * already been evaluated.
      */
-    private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs = new HashMap<>();
+    private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs =
+            new HashMap<>();
 
     public final RelOptTable.ViewExpander viewExpander;
 
-    /** Whether to expand sub-queries. If false, each sub-query becomes a
-     * {@link org.apache.calcite.rex.RexSubQuery}. */
-    private boolean expand = true;
-
     //~ Constructors -----------------------------------------------------------
     /**
      * Creates a converter.
@@ -272,29 +273,53 @@ public class SqlToRelConverter {
      * @param rexBuilder      Rex builder
      * @param convertletTable Expression converter
      */
-    @Deprecated // will be removed before 2.0
-    public SqlToRelConverter(RelOptTable.ViewExpander viewExpander, SqlValidator validator, Prepare.CatalogReader catalogReader, RelOptPlanner planner, RexBuilder rexBuilder, SqlRexConvertletTable convertletTable) {
-        this(viewExpander, validator, catalogReader, RelOptCluster.create(planner, rexBuilder), convertletTable);
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptPlanner planner,
+            RexBuilder rexBuilder,
+            SqlRexConvertletTable convertletTable) {
+        this(viewExpander, validator, catalogReader,
+                RelOptCluster.create(planner, rexBuilder), convertletTable,
+                Config.DEFAULT);
+    }
+
+    @Deprecated // to be removed before 2.0
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable) {
+        this(viewExpander, validator, catalogReader, cluster, convertletTable,
+                Config.DEFAULT);
     }
 
     /* Creates a converter. */
-    public SqlToRelConverter(RelOptTable.ViewExpander viewExpander, SqlValidator validator, Prepare.CatalogReader catalogReader, RelOptCluster cluster, SqlRexConvertletTable convertletTable) {
+    public SqlToRelConverter(
+            RelOptTable.ViewExpander viewExpander,
+            SqlValidator validator,
+            Prepare.CatalogReader catalogReader,
+            RelOptCluster cluster,
+            SqlRexConvertletTable convertletTable,
+            Config config) {
         this.viewExpander = viewExpander;
-        this.opTab = (validator == null) ? SqlStdOperatorTable.instance() : validator.getOperatorTable();
+        this.opTab =
+                (validator
+                        == null) ? SqlStdOperatorTable.instance()
+                        : validator.getOperatorTable();
         this.validator = validator;
         this.catalogReader = catalogReader;
         this.defaultValueFactory = new NullDefaultValueFactory();
-        this.subqueryConverter = new NoOpSubqueryConverter();
+        this.subQueryConverter = new NoOpSubQueryConverter();
         this.rexBuilder = cluster.getRexBuilder();
         this.typeFactory = rexBuilder.getTypeFactory();
         this.cluster = Preconditions.checkNotNull(cluster);
-        this.shouldConvertTableAccess = true;
         this.exprConverter = new SqlNodeToRexConverterImpl(convertletTable);
-        decorrelationEnabled = true;
-        trimUnusedFields = false;
-        shouldCreateValuesRel = true;
-        isExplain = false;
-        nDynamicParamsInExplain = 0;
+        this.explainParamCount = 0;
+        this.config = new ConfigBuilder().withConfig(config).build();
     }
 
     //~ Methods ----------------------------------------------------------------
@@ -345,15 +370,15 @@ public class SqlToRelConverter {
      * @return the current count before the optional increment
      */
     public int getDynamicParamCountInExplain(boolean increment) {
-        int retVal = nDynamicParamsInExplain;
+        int retVal = explainParamCount;
         if (increment) {
-            ++nDynamicParamsInExplain;
+            ++explainParamCount;
         }
         return retVal;
     }
 
     /**
-     * @return mapping of non-correlated subqueries that have been converted to
+     * @return mapping of non-correlated sub-queries that have been converted to
      * the constants that they evaluate to
      */
     public Map<SqlNode, RexNode> getMapConvertedNonCorrSubqs() {
@@ -361,13 +386,14 @@ public class SqlToRelConverter {
     }
 
     /**
-     * Adds to the current map of non-correlated converted subqueries the
-     * elements from another map that contains non-correlated subqueries that
+     * Adds to the current map of non-correlated converted sub-queries the
+     * elements from another map that contains non-correlated sub-queries that
      * have been converted by another SqlToRelConverter.
      *
      * @param alreadyConvertedNonCorrSubqs the other map
      */
-    public void addConvertedNonCorrSubqs(Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
+    public void addConvertedNonCorrSubqs(
+            Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
         mapConvertedNonCorrSubqs.putAll(alreadyConvertedNonCorrSubqs);
     }
 
@@ -382,51 +408,24 @@ public class SqlToRelConverter {
     }
 
     /**
-     * Sets a new SubqueryConverter. To have any effect, this must be called
+     * Sets a new SubQueryConverter. To have any effect, this must be called
      * before any convert method.
      *
-     * @param converter new SubqueryConverter
+     * @param converter new SubQueryConverter
      */
-    public void setSubqueryConverter(SubqueryConverter converter) {
-        subqueryConverter = converter;
+    public void setSubQueryConverter(SubQueryConverter converter) {
+        subQueryConverter = converter;
     }
 
     /**
-     * Indicates that the current statement is part of an EXPLAIN PLAN statement
+     * Sets the number of dynamic parameters in the current EXPLAIN PLAN
+     * statement.
      *
-     * @param nDynamicParams number of dynamic parameters in the statement
+     * @param explainParamCount number of dynamic parameters in the statement
      */
-    public void setIsExplain(int nDynamicParams) {
-        isExplain = true;
-        nDynamicParamsInExplain = nDynamicParams;
-    }
-
-    /**
-     * Controls whether table access references are converted to physical rels
-     * immediately. The optimizer doesn't like leaf rels to have
-     * {@link Convention#NONE}. However, if we are doing further conversion
-     * passes (e.g. {@link RelStructuredTypeFlattener}), then we may need to
-     * defer conversion. To have any effect, this must be called before any
-     * convert method.
-     *
-     * @param enabled true for immediate conversion (the default); false to
-     *                generate logical LogicalTableScan instances
-     */
-    public void enableTableAccessConversion(boolean enabled) {
-        shouldConvertTableAccess = enabled;
-    }
-
-    /**
-     * Controls whether instances of
-     * {@link org.apache.calcite.rel.logical.LogicalValues} are generated. These
-     * may not be supported by all physical implementations. To have any effect,
-     * this must be called before any convert method.
-     *
-     * @param enabled true to allow LogicalValues to be generated (the default);
-     *                false to force substitution of Project+OneRow instead
-     */
-    public void enableValuesRelCreation(boolean enabled) {
-        shouldCreateValuesRel = enabled;
+    public void setDynamicParamCountInExplain(int explainParamCount) {
+        assert config.isExplain();
+        this.explainParamCount = explainParamCount;
     }
 
     private void checkConvertedType(SqlNode query, RelNode result) {
@@ -438,24 +437,42 @@ public class SqlToRelConverter {
         // SQL statement is something like an INSERT which has no
         // validator type information associated with its result,
         // hence the namespace check above.)
-        final List<RelDataTypeField> validatedFields = validator.getValidatedNodeType(query).getFieldList();
-        final RelDataType validatedRowType = validator.getTypeFactory().createStructType(Pair.right(validatedFields), SqlValidatorUtil.uniquify(Pair.left(validatedFields)));
-
-        final List<RelDataTypeField> convertedFields = result.getRowType().getFieldList().subList(0, validatedFields.size());
-        final RelDataType convertedRowType = validator.getTypeFactory().createStructType(convertedFields);
-
-        if (!RelOptUtil.equal("validated row type", validatedRowType, "converted row type", convertedRowType, Litmus.IGNORE)) {
-            throw new AssertionError("Conversion to relational algebra failed to " + "preserve datatypes:\n" + "validated type:\n" + validatedRowType.getFullTypeString() + "\nconverted type:\n" + convertedRowType.getFullTypeString() + "\nrel:\n" + RelOptUtil.toString(result));
+        final List<RelDataTypeField> validatedFields =
+                validator.getValidatedNodeType(query).getFieldList();
+        final RelDataType validatedRowType =
+                validator.getTypeFactory().createStructType(
+                        Pair.right(validatedFields),
+                        SqlValidatorUtil.uniquify(Pair.left(validatedFields),
+                                catalogReader.isCaseSensitive()));
+
+        final List<RelDataTypeField> convertedFields =
+                result.getRowType().getFieldList().subList(0, validatedFields.size());
+        final RelDataType convertedRowType =
+                validator.getTypeFactory().createStructType(convertedFields);
+
+        if (!RelOptUtil.equal("validated row type", validatedRowType,
+                "converted row type", convertedRowType, Litmus.IGNORE)) {
+            throw new AssertionError("Conversion to relational algebra failed to "
+                    + "preserve datatypes:\n"
+                    + "validated type:\n"
+                    + validatedRowType.getFullTypeString()
+                    + "\nconverted type:\n"
+                    + convertedRowType.getFullTypeString()
+                    + "\nrel:\n"
+                    + RelOptUtil.toString(result));
         }
     }
 
-    public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
-        RelStructuredTypeFlattener typeFlattener = new RelStructuredTypeFlattener(rexBuilder, createToRelContext());
-        return typeFlattener.rewrite(rootRel, restructure);
+    public RelNode flattenTypes(
+            RelNode rootRel,
+            boolean restructure) {
+        RelStructuredTypeFlattener typeFlattener =
+                new RelStructuredTypeFlattener(rexBuilder, createToRelContext(), restructure);
+        return typeFlattener.rewrite(rootRel);
     }
 
     /**
-     * If subquery is correlated and decorrelation is enabled, performs
+     * If sub-query is correlated and decorrelation is enabled, performs
      * decorrelation.
      *
      * @param query   Query
@@ -496,14 +513,21 @@ public class SqlToRelConverter {
         // Trim fields that are not used by their consumer.
         if (isTrimUnusedFields()) {
             final RelFieldTrimmer trimmer = newFieldTrimmer();
-            final List<RelCollation> collations = rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+            final List<RelCollation> collations =
+                    rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
             rootRel = trimmer.trim(rootRel);
-            if (!ordered && collations != null && !collations.isEmpty() && !collations.equals(ImmutableList.of(RelCollations.EMPTY))) {
-                final RelTraitSet traitSet = rootRel.getTraitSet().replace(RelCollationTraitDef.INSTANCE, collations);
+            if (!ordered
+                    && collations != null
+                    && !collations.isEmpty()
+                    && !collations.equals(ImmutableList.of(RelCollations.EMPTY))) {
+                final RelTraitSet traitSet = rootRel.getTraitSet()
+                        .replace(RelCollationTraitDef.INSTANCE, collations);
                 rootRel = rootRel.copy(traitSet, rootRel.getInputs());
             }
             if (SQL2REL_LOGGER.isDebugEnabled()) {
-                SQL2REL_LOGGER.debug(RelOptUtil.dumpPlan("Plan after trimming unused fields", rootRel, false, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+                SQL2REL_LOGGER.debug(
+                        RelOptUtil.dumpPlan("Plan after trimming unused fields", rootRel,
+                                SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
             }
         }
         return rootRel;
@@ -515,7 +539,8 @@ public class SqlToRelConverter {
      * @return Field trimmer
      */
     protected RelFieldTrimmer newFieldTrimmer() {
-        final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster, null);
+        final RelBuilder relBuilder =
+                RelFactories.LOGICAL_BUILDER.create(cluster, null);
         return new RelFieldTrimmer(validator, relBuilder);
     }
 
@@ -530,14 +555,18 @@ public class SqlToRelConverter {
      *                        will become a JDBC result set; <code>false</code> if
      *                        the query will be part of a view.
      */
-    public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final boolean top) {
+    public RelRoot convertQuery(
+            SqlNode query,
+            final boolean needsValidation,
+            final boolean top) {
         SqlNode origQuery = query; /* OVERRIDE POINT */
-        
+
         if (needsValidation) {
             query = validator.validate(query);
         }
 
-        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
+        RelMetadataQuery.THREAD_PROVIDERS.set(
+                JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
         RelNode result = convertQueryRecursive(query, top, null).rel;
         if (top) {
             if (isStream(query)) {
@@ -553,18 +582,23 @@ public class SqlToRelConverter {
         checkConvertedType(query, result);
 
         if (SQL2REL_LOGGER.isDebugEnabled()) {
-            SQL2REL_LOGGER.debug(RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode", result, false, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+            SQL2REL_LOGGER.debug(
+                    RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode",
+                            result, SqlExplainFormat.TEXT,
+                            SqlExplainLevel.EXPPLAN_ATTRIBUTES));
         }
 
         final RelDataType validatedRowType = validator.getValidatedNodeType(query);
-        return hackSelectStar(origQuery, RelRoot.of(result, validatedRowType, query.getKind()).withCollation(collation));
+        RelRoot origResult =  RelRoot.of(result, validatedRowType, query.getKind())
+                .withCollation(collation);
+        return hackSelectStar(origQuery, origResult);
     }
 
     /* OVERRIDE POINT */
     private RelRoot hackSelectStar(SqlNode query, RelRoot root) {
         /*
          * Rel tree is like:
-         * 
+         *
          *   LogicalSort (optional)
          *    |- LogicalProject
          *        |- LogicalFilter (optional)
@@ -580,23 +614,23 @@ public class SqlToRelConverter {
         } else {
             return root;
         }
-        
+
         RelNode input = rootPrj.getInput();
         if (!(//
                 input.getClass().getSimpleName().equals("OLAPTableScan")//
-                || (input.getClass().getSimpleName().equals("LogicalFilter") && input.getInput(0).getClass().getSimpleName().equals("OLAPTableScan"))//
-             ))
+                        || (input.getClass().getSimpleName().equals("LogicalFilter") && input.getInput(0).getClass().getSimpleName().equals("OLAPTableScan"))//
+        ))
             return root;
 
         if (rootPrj.getRowType().getFieldCount() < input.getRowType().getFieldCount())
             return root;
-        
+
         RelDataType inType = rootPrj.getRowType();
         List<String> inFields = inType.getFieldNames();
         List<RexNode> projExp = new ArrayList<>();
         List<Pair<Integer, String>> projFields = new ArrayList<>();
-        FieldInfoBuilder projTypeBuilder = getCluster().getTypeFactory().builder();
-        FieldInfoBuilder validTypeBuilder = getCluster().getTypeFactory().builder();
+        RelDataTypeFactory.FieldInfoBuilder projTypeBuilder = getCluster().getTypeFactory().builder();
+        RelDataTypeFactory.FieldInfoBuilder validTypeBuilder = getCluster().getTypeFactory().builder();
         for (int i = 0; i < inFields.size(); i++) {
             if (!inFields.get(i).startsWith("_KY_")) {
                 projExp.add(rootPrj.getProjects().get(i));
@@ -611,29 +645,31 @@ public class SqlToRelConverter {
         if (rootSort != null) {
             rootSort = (LogicalSort) rootSort.copy(rootSort.getTraitSet(), rootPrj, rootSort.collation, rootSort.offset, rootSort.fetch);
         }
-        
+
         RelDataType validRowType = getCluster().getTypeFactory().createStructType(validTypeBuilder);
         root = new RelRoot(rootSort == null ? rootPrj : rootSort, validRowType, root.kind, projFields, root.collation);
-        
+
         validator.setValidatedNodeType(query, validRowType);
-        
+
         return root;
     }
 
     private static boolean isStream(SqlNode query) {
-        return query instanceof SqlSelect && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
+        return query instanceof SqlSelect
+                && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
     }
 
     public static boolean isOrdered(SqlNode query) {
         switch (query.getKind()) {
-        case SELECT:
-            return ((SqlSelect) query).getOrderList() != null && ((SqlSelect) query).getOrderList().size() > 0;
-        case WITH:
-            return isOrdered(((SqlWith) query).body);
-        case ORDER_BY:
-            return ((SqlOrderBy) query).orderList.size() > 0;
-        default:
-            return false;
+            case SELECT:
+                return ((SqlSelect) query).getOrderList() != null
+                        && ((SqlSelect) query).getOrderList().size() > 0;
+            case WITH:
+                return isOrdered(((SqlWith) query).body);
+            case ORDER_BY:
+                return ((SqlOrderBy) query).orderList.size() > 0;
+            default:
+                return false;
         }
     }
 
@@ -663,7 +699,8 @@ public class SqlToRelConverter {
     /**
      * Factory method for creating translation workspace.
      */
-    protected Blackboard createBlackboard(SqlValidatorScope scope, Map<String, RexNode> nameToNodeMap, boolean top) {
+    protected Blackboard createBlackboard(SqlValidatorScope scope,
+                                          Map<String, RexNode> nameToNodeMap, boolean top) {
         return new Blackboard(scope, nameToNodeMap, top);
     }
 
@@ -671,25 +708,45 @@ public class SqlToRelConverter {
      * Implementation of {@link #convertSelect(SqlSelect, boolean)};
      * derived class may override.
      */
-    protected void convertSelectImpl(final Blackboard bb, SqlSelect select) {
-        convertFrom(bb, select.getFrom());
-        convertWhere(bb, select.getWhere());
+    protected void convertSelectImpl(
+            final Blackboard bb,
+            SqlSelect select) {
+        convertFrom(
+                bb,
+                select.getFrom());
+        convertWhere(
+                bb,
+                select.getWhere());
 
         final List<SqlNode> orderExprList = new ArrayList<>();
         final List<RelFieldCollation> collationList = new ArrayList<>();
-        gatherOrderExprs(bb, select, select.getOrderList(), orderExprList, collationList);
-        final RelCollation collation = cluster.traitSet().canonize(RelCollations.of(collationList));
+        gatherOrderExprs(
+                bb,
+                select,
+                select.getOrderList(),
+                orderExprList,
+                collationList);
+        final RelCollation collation =
+                cluster.traitSet().canonize(RelCollations.of(collationList));
 
         if (validator.isAggregate(select)) {
-            convertAgg(bb, select, orderExprList);
+            convertAgg(
+                    bb,
+                    select,
+                    orderExprList);
         } else {
-            convertSelectList(bb, select, orderExprList);
+            convertSelectList(
+                    bb,
+                    select,
+                    orderExprList);
         }
 
         if (select.isDistinct()) {
             distinctify(bb, true);
         }
-        convertOrder(select, bb, collation, orderExprList, select.getOffset(), select.getFetch());
+        convertOrder(
+                select, bb, collation, orderExprList, select.getOffset(),
+                select.getFetch());
         bb.setRoot(bb.root, true);
     }
 
@@ -705,7 +762,9 @@ public class SqlToRelConverter {
      * @param bb               Blackboard
      * @param checkForDupExprs Check for duplicate expressions
      */
-    private void distinctify(Blackboard bb, boolean checkForDupExprs) {
+    private void distinctify(
+            Blackboard bb,
+            boolean checkForDupExprs) {
         // Look for duplicate expressions in the project.
         // Say we have 'select x, y, x, z'.
         // Then dups will be {[2, 0]}
@@ -739,7 +798,9 @@ public class SqlToRelConverter {
                     newProjects.add(RexInputRef.of2(i, fields));
                 }
             }
-            rel = LogicalProject.create(rel, Pair.left(newProjects), Pair.right(newProjects));
+            rel =
+                    LogicalProject.create(rel, Pair.left(newProjects),
+                            Pair.right(newProjects));
             bb.root = rel;
             distinctify(bb, false);
             rel = bb.root;
@@ -750,21 +811,34 @@ public class SqlToRelConverter {
             for (int i = 0; i < fields.size(); i++) {
                 final int origin = origins.get(i);
                 RelDataTypeField field = fields.get(i);
-                undoProjects.add(Pair.of((RexNode) new RexInputRef(squished.get(origin), field.getType()), field.getName()));
+                undoProjects.add(
+                        Pair.of(
+                                (RexNode) new RexInputRef(
+                                        squished.get(origin), field.getType()),
+                                field.getName()));
             }
 
-            rel = LogicalProject.create(rel, Pair.left(undoProjects), Pair.right(undoProjects));
-            bb.setRoot(rel, false);
+            rel =
+                    LogicalProject.create(rel, Pair.left(undoProjects),
+                            Pair.right(undoProjects));
+            bb.setRoot(
+                    rel,
+                    false);
 
             return;
         }
 
         // Usual case: all of the expressions in the SELECT clause are
         // different.
-        final ImmutableBitSet groupSet = ImmutableBitSet.range(rel.getRowType().getFieldCount());
-        rel = createAggregate(bb, false, groupSet, ImmutableList.of(groupSet), ImmutableList.<AggregateCall> of());
-
-        bb.setRoot(rel, false);
+        final ImmutableBitSet groupSet =
+                ImmutableBitSet.range(rel.getRowType().getFieldCount());
+        rel =
+                createAggregate(bb, false, groupSet, ImmutableList.of(groupSet),
+                        ImmutableList.<AggregateCall>of());
+
+        bb.setRoot(
+                rel,
+                false);
     }
 
     private int findExpr(RexNode seek, List<RexNode> exprs, int count) {
@@ -789,16 +863,29 @@ public class SqlToRelConverter {
      *                      returning first row
      * @param fetch         Expression for number of rows to fetch
      */
-    protected void convertOrder(SqlSelect select, Blackboard bb, RelCollation collation, List<SqlNode> orderExprList, SqlNode offset, SqlNode fetch) {
-        if (select.getOrderList() == null || select.getOrderList().getList().isEmpty()) {
+    protected void convertOrder(
+            SqlSelect select,
+            Blackboard bb,
+            RelCollation collation,
+            List<SqlNode> orderExprList,
+            SqlNode offset,
+            SqlNode fetch) {
+        if (select.getOrderList() == null
+                || select.getOrderList().getList().isEmpty()) {
             assert collation.getFieldCollations().isEmpty();
-            if ((offset == null || ((SqlLiteral) offset).bigDecimalValue().equals(BigDecimal.ZERO)) && fetch == null) {
+            if ((offset == null
+                    || ((SqlLiteral) offset).bigDecimalValue().equals(BigDecimal.ZERO))
+                    && fetch == null) {
                 return;
             }
         }
 
         // Create a sorter using the previously constructed collations.
-        bb.setRoot(LogicalSort.create(bb.root, collation, offset == null ? null : convertExpression(offset), fetch == null ? null : convertExpression(fetch)), false);
+        bb.setRoot(
+                LogicalSort.create(bb.root, collation,
+                        offset == null ? null : convertExpression(offset),
+                        fetch == null ? null : convertExpression(fetch)),
+                false);
 
         // If extra expressions were added to the project list for sorting,
         // add another project to remove them. But make the collation empty, because
@@ -808,11 +895,15 @@ public class SqlToRelConverter {
         if (orderExprList.size() > 0 && !bb.top) {
             final List<RexNode> exprs = new ArrayList<>();
             final RelDataType rowType = bb.root.getRowType();
-            final int fieldCount = rowType.getFieldCount() - orderExprList.size();
+            final int fieldCount =
+                    rowType.getFieldCount() - orderExprList.size();
             for (int i = 0; i < fieldCount; i++) {
                 exprs.add(rexBuilder.makeInputRef(bb.root, i));
             }
-            bb.setRoot(LogicalProject.create(bb.root, exprs, rowType.getFieldNames().subList(0, fieldCount)), false);
+            bb.setRoot(
+                    LogicalProject.create(bb.root, exprs,
+                            rowType.getFieldNames().subList(0, fieldCount)),
+                    false);
         }
     }
 
@@ -821,16 +912,18 @@ public class SqlToRelConverter {
      *
      * @param node a RexNode tree
      */
-    private static boolean containsInOperator(SqlNode node) {
+    private static boolean containsInOperator(
+            SqlNode node) {
         try {
-            SqlVisitor<Void> visitor = new SqlBasicVisitor<Void>() {
-                public Void visit(SqlCall call) {
-                    if (call.getOperator() instanceof SqlInOperator) {
-                        throw new Util.FoundOne(call);
-                    }
-                    return super.visit(call);
-                }
-            };
+            SqlVisitor<Void> visitor =
+                    new SqlBasicVisitor<Void>() {
+                        public Void visit(SqlCall call) {
+                            if (call.getOperator() instanceof SqlInOperator) {
+                                throw new Util.FoundOne(call);
+                            }
+                            return super.visit(call);
+                        }
+                    };
             node.accept(visitor);
             return false;
         } catch (Util.FoundOne e) {
@@ -842,18 +935,21 @@ public class SqlToRelConverter {
     /**
      * Push down all the NOT logical operators into any IN/NOT IN operators.
      *
+     * @param scope Scope where {@code sqlNode} occurs
      * @param sqlNode the root node from which to look for NOT operators
      * @return the transformed SqlNode representation with NOT pushed down.
      */
-    private static SqlNode pushDownNotForIn(SqlNode sqlNode) {
+    private static SqlNode pushDownNotForIn(SqlValidatorScope scope,
+                                            SqlNode sqlNode) {
         if ((sqlNode instanceof SqlCall) && containsInOperator(sqlNode)) {
             SqlCall sqlCall = (SqlCall) sqlNode;
-            if ((sqlCall.getOperator() == SqlStdOperatorTable.AND) || (sqlCall.getOperator() == SqlStdOperatorTable.OR)) {
+            if ((sqlCall.getOperator() == SqlStdOperatorTable.AND)
+                    || (sqlCall.getOperator() == SqlStdOperatorTable.OR)) {
                 SqlNode[] sqlOperands = ((SqlBasicCall) sqlCall).operands;
                 for (int i = 0; i < sqlOperands.length; i++) {
-                    sqlOperands[i] = pushDownNotForIn(sqlOperands[i]);
+                    sqlOperands[i] = pushDownNotForIn(scope, sqlOperands[i]);
                 }
-                return sqlNode;
+                return reg(scope, sqlNode);
             } else if (sqlCall.getOperator() == SqlStdOperatorTable.NOT) {
                 SqlNode childNode = sqlCall.operand(0);
                 assert childNode instanceof SqlCall;
@@ -862,33 +958,34 @@ public class SqlToRelConverter {
                     SqlNode[] andOperands = childSqlCall.getOperands();
                     SqlNode[] orOperands = new SqlNode[andOperands.length];
                     for (int i = 0; i < orOperands.length; i++) {
-                        orOperands[i] = SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, andOperands[i]);
+                        orOperands[i] = reg(scope, SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, andOperands[i]));
                     }
                     for (int i = 0; i < orOperands.length; i++) {
-                        orOperands[i] = pushDownNotForIn(orOperands[i]);
+                        orOperands[i] = pushDownNotForIn(scope, orOperands[i]);
                     }
-                    return SqlStdOperatorTable.OR.createCall(SqlParserPos.ZERO, orOperands[0], orOperands[1]);
+                    return reg(scope, SqlStdOperatorTable.OR.createCall(SqlParserPos.ZERO, orOperands[0], orOperands[1]));
                 } else if (childSqlCall.getOperator() == SqlStdOperatorTable.OR) {
                     SqlNode[] orOperands = childSqlCall.getOperands();
                     SqlNode[] andOperands = new SqlNode[orOperands.length];
                     for (int i = 0; i < andOperands.length; i++) {
-                        andOperands[i] = SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, orOperands[i]);
+                        andOperands[i] = reg(scope, SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, orOperands[i]));
                     }
                     for (int i = 0; i < andOperands.length; i++) {
-                        andOperands[i] = pushDownNotForIn(andOperands[i]);
+                        andOperands[i] = pushDownNotForIn(scope, andOperands[i]);
                     }
-                    return SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO, andOperands[0], andOperands[1]);
+                    return reg(scope, SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO, andOperands[0], andOperands[1]));
                 } else if (childSqlCall.getOperator() == SqlStdOperatorTable.NOT) {
                     SqlNode[] notOperands = childSqlCall.getOperands();
                     assert notOperands.length == 1;
-                    return pushDownNotForIn(notOperands[0]);
+                    return pushDownNotForIn(scope, notOperands[0]);
                 } else if (childSqlCall.getOperator() instanceof SqlInOperator) {
                     SqlNode[] inOperands = childSqlCall.getOperands();
-                    SqlInOperator inOp = (SqlInOperator) childSqlCall.getOperator();
+                    SqlInOperator inOp =
+                            (SqlInOperator) childSqlCall.getOperator();
                     if (inOp.isNotIn()) {
-                        return SqlStdOperatorTable.IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]);
+                        return reg(scope, SqlStdOperatorTable.IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]));
                     } else {
-                        return SqlStdOperatorTable.NOT_IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]);
+                        return reg(scope, SqlStdOperatorTable.NOT_IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]));
                     }
                 } else {
                     // childSqlCall is "leaf" node in a logical expression tree
@@ -906,18 +1003,27 @@ public class SqlToRelConverter {
         }
     }
 
+    /** Registers with the validator a {@link SqlNode} that has been created
+     * during the Sql-to-Rel process. */
+    private static SqlNode reg(SqlValidatorScope scope, SqlNode e) {
+        scope.getValidator().deriveType(scope, e);
+        return e;
+    }
+
     /**
      * Converts a WHERE clause.
      *
      * @param bb    Blackboard
      * @param where WHERE clause, may be null
      */
-    private void convertWhere(final Blackboard bb, final SqlNode where) {
+    private void convertWhere(
+            final Blackboard bb,
+            final SqlNode where) {
         if (where == null) {
             return;
         }
-        SqlNode newWhere = pushDownNotForIn(where);
-        replaceSubqueries(bb, newWhere, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
+        SqlNode newWhere = pushDownNotForIn(bb.scope, where);
+        replaceSubQueries(bb, newWhere, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
         final RexNode convertedWhere = bb.convertExpression(newWhere);
 
         // only allocate filter if the condition is not TRUE
@@ -925,13 +1031,16 @@ public class SqlToRelConverter {
             return;
         }
 
-        final RelNode filter = RelOptUtil.createFilter(bb.root, convertedWhere);
+        final RelFactories.FilterFactory factory =
+                RelFactories.DEFAULT_FILTER_FACTORY;
+        final RelNode filter = factory.createFilter(bb.root, convertedWhere);
         final RelNode r;
         final CorrelationUse p = getCorrelationUse(bb, filter);
         if (p != null) {
             assert p.r instanceof Filter;
             Filter f = (Filter) p.r;
-            r = LogicalFilter.create(f.getInput(), f.getCondition(), ImmutableSet.of(p.id));
+            r = LogicalFilter.create(f.getInput(), f.getCondition(),
+                    ImmutableSet.of(p.id));
         } else {
             r = filter;
         }
@@ -939,14 +1048,17 @@ public class SqlToRelConverter {
         bb.setRoot(r, false);
     }
 
-    private void replaceSubqueries(final Blackboard bb, final SqlNode expr, RelOptUtil.Logic logic) {
-        findSubqueries(bb, expr, logic, false);
-        for (SubQuery node : bb.subqueryList) {
-            substituteSubquery(bb, node);
+    private void replaceSubQueries(
+            final Blackboard bb,
+            final SqlNode expr,
+            RelOptUtil.Logic logic) {
+        findSubQueries(bb, expr, logic, false);
+        for (SubQuery node : bb.subQueryList) {
+            substituteSubQuery(bb, node);
         }
     }
 
-    private void substituteSubquery(Blackboard bb, SubQuery subQuery) {
+    private void substituteSubQuery(Blackboard bb, SubQuery subQuery) {
         final RexNode expr = subQuery.expr;
         if (expr != null) {
             // Already done.
@@ -956,220 +1068,270 @@ public class SqlToRelConverter {
         final SqlBasicCall call;
         final RelNode rel;
         final SqlNode query;
-        final Pair<RelNode, Boolean> converted;
+        final RelOptUtil.Exists converted;
         switch (subQuery.node.getKind()) {
-        case CURSOR:
-            convertCursor(bb, subQuery);
-            return;
-
-        case MULTISET_QUERY_CONSTRUCTOR:
-        case MULTISET_VALUE_CONSTRUCTOR:
-        case ARRAY_QUERY_CONSTRUCTOR:
-            rel = convertMultisets(ImmutableList.of(subQuery.node), bb);
-            subQuery.expr = bb.register(rel, JoinRelType.INNER);
-            return;
+            case CURSOR:
+                convertCursor(bb, subQuery);
+                return;
 
-        case IN:
-            call = (SqlBasicCall) subQuery.node;
-            query = call.operand(1);
-            if (!expand && !(query instanceof SqlNodeList)) {
+            case MULTISET_QUERY_CONSTRUCTOR:
+            case MULTISET_VALUE_CONSTRUCTOR:
+            case ARRAY_QUERY_CONSTRUCTOR:
+                rel = convertMultisets(ImmutableList.of(subQuery.node), bb);
+                subQuery.expr = bb.register(rel, JoinRelType.INNER);
                 return;
-            }
-            final SqlNode leftKeyNode = call.operand(0);
-
-            final List<RexNode> leftKeys;
-            switch (leftKeyNode.getKind()) {
-            case ROW:
-                leftKeys = Lists.newArrayList();
-                for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
-                    leftKeys.add(bb.convertExpression(sqlExpr));
-                }
-                break;
-            default:
-                leftKeys = ImmutableList.of(bb.convertExpression(leftKeyNode));
-            }
 
-            final boolean isNotIn = ((SqlInOperator) call.getOperator()).isNotIn();
-            if (query instanceof SqlNodeList) {
-                SqlNodeList valueList = (SqlNodeList) query;
-                if (!containsNullLiteral(valueList) && valueList.size() < getInSubqueryThreshold()) {
-                    // We're under the threshold, so convert to OR.
-                    subQuery.expr = convertInToOr(bb, leftKeys, valueList, isNotIn);
+            case IN:
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(1);
+                if (!config.isExpand() && !(query instanceof SqlNodeList)) {
                     return;
                 }
+                final SqlNode leftKeyNode = call.operand(0);
+
+                final List<RexNode> leftKeys;
+                switch (leftKeyNode.getKind()) {
+                    case ROW:
+                        leftKeys = Lists.newArrayList();
+                        for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
+                            leftKeys.add(bb.convertExpression(sqlExpr));
+                        }
+                        break;
+                    default:
+                        leftKeys = ImmutableList.of(bb.convertExpression(leftKeyNode));
+                }
 
-                // Otherwise, let convertExists translate
-                // values list into an inline table for the
-                // reference to Q below.
-            }
-
-            // Project out the search columns from the left side
-
-            //  Q1:
-            // "select from emp where emp.deptno in (select col1 from T)"
-            //
-            // is converted to
-            //
-            // "select from
-            //   emp inner join (select distinct col1 from T)) q
-            //   on emp.deptno = q.col1
-            //
-            // Q2:
-            // "select from emp where emp.deptno not in (Q)"
-            //
-            // is converted to
-            //
-            // "select from
-            //   emp left outer join (select distinct col1, TRUE from T) q
-            //   on emp.deptno = q.col1
-            //   where emp.deptno <> null
-            //         and q.indicator <> TRUE"
-            //
-            final boolean outerJoin = bb.subqueryNeedsOuterJoin || isNotIn || subQuery.logic == RelOptUtil.Logic.TRUE_FALSE_UNKNOWN;
-            final RelDataType targetRowType = SqlTypeUtil.promoteToRowType(typeFactory, validator.getValidatedNodeType(leftKeyNode), null);
-            converted = convertExists(query, RelOptUtil.SubqueryType.IN, subQuery.logic, outerJoin, targetRowType);
-            if (converted.right) {
-                // Generate
-                //    emp CROSS JOIN (SELECT COUNT(*) AS c,
-                //                       COUNT(deptno) AS ck FROM dept)
-                final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
-                final RelNode seek = converted.left.getInput(0); // fragile
-                final int keyCount = leftKeys.size();
-                final List<Integer> args = ImmutableIntList.range(0, keyCount);
-                LogicalAggregate aggregate = LogicalAggregate.create(seek, false, ImmutableBitSet.of(), null, ImmutableList.of(AggregateCall.create(SqlStdOperatorTable.COUNT, false, ImmutableList.<Integer> of(), -1, longType, null), AggregateCall.create(SqlStdOperatorTable.COUNT, false, args, -1, longType, null)));
-                LogicalJoin join = LogicalJoin.create(bb.root, aggregate, rexBuilder.makeLiteral(true), ImmutableSet.<CorrelationId> of(), JoinRelType.INNER);
-                bb.setRoot(join, false);
-            }
-            RexNode rex = bb.register(converted.left, outerJoin ? JoinRelType.LEFT : JoinRelType.INNER, leftKeys);
+                final boolean notIn = ((SqlInOperator) call.getOperator()).isNotIn();
+                if (query instanceof SqlNodeList) {
+                    SqlNodeList valueList = (SqlNodeList) query;
+                    if (!containsNullLiteral(valueList)
+                            && valueList.size() < config.getInSubQueryThreshold()) {
+                        // We're under the threshold, so convert to OR.
+                        subQuery.expr =
+                                convertInToOr(
+                                        bb,
+                                        leftKeys,
+                                        valueList,
+                                        notIn);
+                        return;
+                    }
 
-            subQuery.expr = translateIn(subQuery, bb.root, rex);
-            if (isNotIn) {
-                subQuery.expr = rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
-            }
-            return;
+                    // Otherwise, let convertExists translate
+                    // values list into an inline table for the
+                    // reference to Q below.
+                }
 
-        case EXISTS:
-            // "select from emp where exists (select a from T)"
-            //
-            // is converted to the following if the subquery is correlated:
-            //
-            // "select from emp left outer join (select AGG_TRUE() as indicator
-            // from T group by corr_var) q where q.indicator is true"
-            //
-            // If there is no correlation, the expression is replaced with a
-            // boolean indicating whether the subquery returned 0 or >= 1 row.
-            call = (SqlBasicCall) subQuery.node;
-            query = call.operand(0);
-            if (!expand) {
-                return;
-            }
-            converted = convertExists(query, RelOptUtil.SubqueryType.EXISTS, subQuery.logic, true, null);
-            assert !converted.right;
-            if (convertNonCorrelatedSubQuery(subQuery, bb, converted.left, true)) {
+                // Project out the search columns from the left side
+
+                // Q1:
+                // "select from emp where emp.deptno in (select col1 from T)"
+                //
+                // is converted to
+                //
+                // "select from
+                //   emp inner join (select distinct col1 from T)) q
+                //   on emp.deptno = q.col1
+                //
+                // Q2:
+                // "select from emp where emp.deptno not in (Q)"
+                //
+                // is converted to
+                //
+                // "select from
+                //   emp left outer join (select distinct col1, TRUE from T) q
+                //   on emp.deptno = q.col1
+                //   where emp.deptno <> null
+                //         and q.indicator <> TRUE"
+                //
+                final RelDataType targetRowType =
+                        SqlTypeUtil.promoteToRowType(typeFactory,
+                                validator.getValidatedNodeType(leftKeyNode), null);
+                converted =
+                        convertExists(query, RelOptUtil.SubQueryType.IN, subQuery.logic,
+                                notIn, targetRowType);
+                if (converted.indicator) {
+                    // Generate
+                    //    emp CROSS JOIN (SELECT COUNT(*) AS c,
+                    //                       COUNT(deptno) AS ck FROM dept)
+                    final RelDataType longType =
+                            typeFactory.createSqlType(SqlTypeName.BIGINT);
+                    final RelNode seek = converted.r.getInput(0); // fragile
+                    final int keyCount = leftKeys.size();
+                    final List<Integer> args = ImmutableIntList.range(0, keyCount);
+                    LogicalAggregate aggregate =
+                            LogicalAggregate.create(seek, false, ImmutableBitSet.of(), null,
+                                    ImmutableList.of(
+                                            AggregateCall.create(SqlStdOperatorTable.COUNT, false,
+                                                    ImmutableList.<Integer>of(), -1, longType, null),
+                                            AggregateCall.create(SqlStdOperatorTable.COUNT, false,
+                                                    args, -1, longType, null)));
+                    LogicalJoin join =
+                            LogicalJoin.create(bb.root, aggregate, rexBuilder.makeLiteral(true),
+                                    ImmutableSet.<CorrelationId>of(), JoinRelType.INNER);
+                    bb.setRoot(join, false);
+                }
+                final RexNode rex =
+                        bb.register(converted.r,
+                                converted.outerJoin ? JoinRelType.LEFT : JoinRelType.INNER,
+                                leftKeys);
+
+                RelOptUtil.Logic logic = subQuery.logic;
+                switch (logic) {
+                    case TRUE_FALSE_UNKNOWN:
+                    case UNKNOWN_AS_TRUE:
+                        if (!converted.indicator) {
+                            logic = RelOptUtil.Logic.TRUE_FALSE;
+                        }
+                }
+                subQuery.expr = translateIn(logic, bb.root, rex);
+                if (notIn) {
+                    subQuery.expr =
+                            rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
+                }
                 return;
-            }
-            subQuery.expr = bb.register(converted.left, JoinRelType.LEFT);
-            return;
 
-        case SCALAR_QUERY:
-            // Convert the subquery.  If it's non-correlated, convert it
-            // to a constant expression.
-            if (!expand) {
+            case EXISTS:
+                // "select from emp where exists (select a from T)"
+                //
+                // is converted to the following if the sub-query is correlated:
+                //
+                // "select from emp left outer join (select AGG_TRUE() as indicator
+                // from T group by corr_var) q where q.indicator is true"
+                //
+                // If there is no correlation, the expression is replaced with a
+                // boolean indicating whether the sub-query returned 0 or >= 1 row.
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(0);
+                if (!config.isExpand()) {
+                    return;
+                }
+                converted = convertExists(query, RelOptUtil.SubQueryType.EXISTS,
+                        subQuery.logic, true, null);
+                assert !converted.indicator;
+                if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, true)) {
+                    return;
+                }
+                subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
                 return;
-            }
-            call = (SqlBasicCall) subQuery.node;
-            query = call.operand(0);
-            converted = convertExists(query, RelOptUtil.SubqueryType.SCALAR, subQuery.logic, true, null);
-            assert !converted.right;
-            if (convertNonCorrelatedSubQuery(subQuery, bb, converted.left, false)) {
+
+            case SCALAR_QUERY:
+                // Convert the sub-query.  If it's non-correlated, convert it
+                // to a constant expression.
+                if (!config.isExpand()) {
+                    return;
+                }
+                call = (SqlBasicCall) subQuery.node;
+                query = call.operand(0);
+                converted = convertExists(query, RelOptUtil.SubQueryType.SCALAR,
+                        subQuery.logic, true, null);
+                assert !converted.indicator;
+                if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, false)) {
+                    return;
+                }
+                rel = convertToSingleValueSubq(query, converted.r);
+                subQuery.expr = bb.register(rel, JoinRelType.LEFT);
                 return;
-            }
-            rel = convertToSingleValueSubq(query, converted.left);
-            subQuery.expr = bb.register(rel, JoinRelType.LEFT);
-            return;
 
-        case SELECT:
-            // This is used when converting multiset queries:
-            //
-            // select * from unnest(select multiset[deptno] from emps);
-            //
-            converted = convertExists(subQuery.node, RelOptUtil.SubqueryType.SCALAR, subQuery.logic, true, null);
-            assert !converted.right;
-            subQuery.expr = bb.register(converted.left, JoinRelType.LEFT);
-            return;
+            case SELECT:
+                // This is used when converting multiset queries:
+                //
+                // select * from unnest(select multiset[deptno] from emps);
+                //
+                converted = convertExists(subQuery.node, RelOptUtil.SubQueryType.SCALAR,
+                        subQuery.logic, true, null);
+                assert !converted.indicator;
+                subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
+                return;
 
-        default:
-            throw Util.newInternal("unexpected kind of subquery :" + subQuery.node);
+            default:
+                throw Util.newInternal("unexpected kind of sub-query :" + subQuery.node);
         }
     }
 
-    private RexNode translateIn(SubQuery subQuery, RelNode root, final RexNode rex) {
-        switch (subQuery.logic) {
-        case TRUE:
-            return rexBuilder.makeLiteral(true);
-
-        case UNKNOWN_AS_FALSE:
-            assert rex instanceof RexRangeRef;
-            final int fieldCount = rex.getType().getFieldCount();
-            RexNode rexNode = rexBuilder.makeFieldAccess(rex, fieldCount - 1);
-            rexNode = rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, rexNode);
-
-            // Then append the IS NOT NULL(leftKeysForIn).
-            //
-            // RexRangeRef contains the following fields:
-            //   leftKeysForIn,
-            //   rightKeysForIn (the original subquery select list),
-            //   nullIndicator
-            //
-            // The first two lists contain the same number of fields.
-            final int k = (fieldCount - 1) / 2;
-            for (int i = 0; i < k; i++) {
-                rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, rexNode, rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, rexBuilder.makeFieldAccess(rex, i)));
-            }
-            return rexNode;
-
-        case TRUE_FALSE_UNKNOWN:
-        case UNKNOWN_AS_TRUE:
-            // select e.deptno,
-            //   case
-            //   when ct.c = 0 then false
-            //   when dt.i is not null then true
-            //   when e.deptno is null then null
-            //   when ct.ck < ct.c then null
-            //   else false
-            //   end
-            // from e
-            // cross join (select count(*) as c, count(deptno) as ck from v) as ct
-            // left join (select distinct deptno, true as i from v) as dt
-            //   on e.deptno = dt.deptno
-            final Join join = (Join) root;
-            final Project left = (Project) join.getLeft();
-            final RelNode leftLeft = ((Join) left.getInput()).getLeft();
-            final int leftLeftCount = leftLeft.getRowType().getFieldCount();
-            final RelDataType nullableBooleanType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BOOLEAN), true);
-            final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
-            final RexNode cRef = rexBuilder.makeInputRef(root, leftLeftCount);
-            final RexNode ckRef = rexBuilder.makeInputRef(root, leftLeftCount + 1);
-            final RexNode iRef = rexBuilder.makeInputRef(root, root.getRowType().getFieldCount() - 1);
-
-            final RexLiteral zero = rexBuilder.makeExactLiteral(BigDecimal.ZERO, longType);
-            final RexLiteral trueLiteral = rexBuilder.makeLiteral(true);
-            final RexLiteral falseLiteral = rexBuilder.makeLiteral(false);
-            final RexNode unknownLiteral = rexBuilder.makeNullLiteral(SqlTypeName.BOOLEAN);
-
-            final ImmutableList.Builder<RexNode> args = ImmutableList.builder();
-            args.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, cRef, zero), falseLiteral, rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, iRef), trueLiteral);
-            final JoinInfo joinInfo = join.analyzeCondition();
-            for (int leftKey : joinInfo.leftKeys) {
-                final RexNode kRef = rexBuilder.makeInputRef(root, leftKey);
-                args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, kRef), unknownLiteral);
-            }
-            args.add(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, ckRef, cRef), unknownLiteral, falseLiteral);
+    private RexNode translateIn(RelOptUtil.Logic logic, RelNode root,
+                                final RexNode rex) {
+        switch (logic) {
+            case TRUE:
+                return rexBuilder.makeLiteral(true);
+
+            case TRUE_FALSE:
+            case UNKNOWN_AS_FALSE:
+                assert rex instanceof RexRangeRef;
+                final int fieldCount = rex.getType().getFieldCount();
+                RexNode rexNode = rexBuilder.makeFieldAccess(rex, fieldCount - 1);
+                rexNode = rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, rexNode);
+
+                // Then append the IS NOT NULL(leftKeysForIn).
+                //
+                // RexRangeRef contains the following fields:
+                //   leftKeysForIn,
+                //   rightKeysForIn (the original sub-query select list),
+                //   nullIndicator
+                //
+                // The first two lists contain the same number of fields.
+                final int k = (fieldCount - 1) / 2;
+                for (int i = 0; i < k; i++) {
+                    rexNode =
+                            rexBuilder.makeCall(
+                                    SqlStdOperatorTable.AND,
+                                    rexNode,
+                                    rexBuilder.makeCall(
+                                            SqlStdOperatorTable.IS_NOT_NULL,
+                                            rexBuilder.makeFieldAccess(rex, i)));
+                }
+                return rexNode;
+
+            case TRUE_FALSE_UNKNOWN:
+            case UNKNOWN_AS_TRUE:
+                // select e.deptno,
+                //   case
+                //   when ct.c = 0 then false
+                //   when dt.i is not null then true
+                //   when e.deptno is null then null
+                //   when ct.ck < ct.c then null
+                //   else false
+                //   end
+                // from e
+                // cross join (select count(*) as c, count(deptno) as ck from v) as ct
+                // left join (select distinct deptno, true as i from v) as dt
+                //   on e.deptno = dt.deptno
+                final Join join = (Join) root;
+                final Project left = (Project) join.getLeft();
+                final RelNode leftLeft = ((Join) left.getInput()).getLeft();
+                final int leftLeftCount = leftLeft.getRowType().getFieldCount();
+                final RelDataType longType =
+                        typeFactory.createSqlType(SqlTypeName.BIGINT);
+                final RexNode cRef = rexBuilder.makeInputRef(root, leftLeftCount);
+                final RexNode ckRef = rexBuilder.makeInputRef(root, leftLeftCount + 1);
+                final RexNode iRef =
+                        rexBuilder.makeInputRef(root, root.getRowType().getFieldCount() - 1);
+
+                final RexLiteral zero =
+                        rexBuilder.makeExactLiteral(BigDecimal.ZERO, longType);
+                final RexLiteral trueLiteral = rexBuilder.makeLiteral(true);
+                final RexLiteral falseLiteral = rexBuilder.makeLiteral(false);
+                final RexNode unknownLiteral =
+                        rexBuilder.makeNullLiteral(SqlTypeName.BOOLEAN);
+
+                final ImmutableList.Builder<RexNode> args = ImmutableList.builder();
+                args.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, cRef, zero),
+                        falseLiteral,
+                        rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, iRef),
+                        trueLiteral);
+                final JoinInfo joinInfo = join.analyzeCondition();
+                for (int leftKey : joinInfo.leftKeys) {
+                    final RexNode kRef = rexBuilder.makeInputRef(root, leftKey);
+                    args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, kRef),
+                            unknownLiteral);
+                }
+                args.add(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, ckRef, cRef),
+                        unknownLiteral,
+                        falseLiteral);
 
-            return rexBuilder.makeCall(nullableBooleanType, SqlStdOperatorTable.CASE, args.build());
+                return rexBuilder.makeCall(SqlStdOperatorTable.CASE, args.build());
 
-        default:
-            throw new AssertionError(subQuery.logic);
+            default:
+                throw new AssertionError(logic);
         }
     }
 
@@ -1186,24 +1348,34 @@ public class SqlToRelConverter {
     }
 
     /**
-     * Determines if a subquery is non-correlated and if so, converts it to a
+     * Determines if a sub-query is non-correlated and if so, converts it to a
      * constant.
      *
-     * @param subQuery  the call that references the subquery
-     * @param bb        blackboard used to convert the subquery
-     * @param converted RelNode tree corresponding to the subquery
-     * @param isExists  true if the subquery is part of an EXISTS expression
-     * @return if the subquery can be converted to a constant
+     * @param subQuery  the call that references the sub-query
+     * @param bb        blackboard used to convert the sub-query
+     * @param converted RelNode tree corresponding to the sub-query
+     * @param isExists  true if the sub-query is part of an EXISTS expression
+     * @return Whether the sub-query can be converted to a constant
      */
-    private boolean convertNonCorrelatedSubQuery(SubQuery subQuery, Blackboard bb, RelNode converted, boolean isExists) {
+    private boolean convertNonCorrelatedSubQuery(
+            SubQuery subQuery,
+            Blackboard bb,
+            RelNode converted,
+            boolean isExists) {
         SqlCall call = (SqlBasicCall) subQuery.node;
-        if (subqueryConverter.canConvertSubquery() && isSubQueryNonCorrelated(converted, bb)) {
-            // First check if the subquery has already been converted
-            // because it's a nested subquery.  If so, don't re-evaluate
+        if (subQueryConverter.canConvertSubQuery()
+                && isSubQueryNonCorrelated(converted, bb)) {
+            // First check if the sub-query has already been converted
+            // because it's a nested sub-query.  If so, don't re-evaluate
             // it again.
             RexNode constExpr = mapConvertedNonCorrSubqs.get(call);
             if (constExpr == null) {
-                constExpr = subqueryConverter.convertSubquery(call, this, isExists, isExplain);
+                constExpr =
+                        subQueryConverter.convertSubQuery(
+                                call,
+                                this,
+                                isExists,
+                                config.isExplain());
             }
             if (constExpr != null) {
                 subQuery.expr = constExpr;
@@ -1222,14 +1394,17 @@ public class SqlToRelConverter {
      * @param plan   the original RelNode tree corresponding to the statement
      * @return the converted RelNode tree
      */
-    public RelNode convertToSingleValueSubq(SqlNode query, RelNode plan) {
+    public RelNode convertToSingleValueSubq(
+            SqlNode query,
+            RelNode plan) {
         // Check whether query is guaranteed to produce a single value.
         if (query instanceof SqlSelect) {
             SqlSelect select = (SqlSelect) query;
             SqlNodeList selectList = select.getSelectList();
             SqlNodeList groupList = select.getGroup();
 
-            if ((selectList.size() == 1) && ((groupList == null) || (groupList.size() == 0))) {
+            if ((selectList.size() == 1)
+                    && ((groupList == null) || (groupList.size() == 0))) {
                 SqlNode selectExpr = selectList.get(0);
                 if (selectExpr instanceof SqlCall) {
                     SqlCall selectExprCall = (SqlCall) selectExpr;
@@ -1240,7 +1415,8 @@ public class SqlToRelConverter {
 
                 // If there is a limit with 0 or 1,
                 // it is ensured to produce a single value
-                if (select.getFetch() != null && select.getFetch() instanceof SqlNumericLiteral) {
+                if (select.getFetch() != null
+                        && select.getFetch() instanceof SqlNumericLiteral) {
                     SqlNumericLiteral limitNum = (SqlNumericLiteral) select.getFetch();
                     if (((BigDecimal) limitNum.getValue()).intValue() < 2) {
                         return plan;
@@ -1252,13 +1428,17 @@ public class SqlToRelConverter {
             // it is necessary to look into the operands to determine
             // whether SingleValueAgg is necessary
             SqlCall exprCall = (SqlCall) query;
-            if (exprCall.getOperator() instanceof SqlValuesOperator && Util.isSingleValue(exprCall)) {
+            if (exprCall.getOperator()
+                    instanceof SqlValuesOperator
+                    && Util.isSingleValue(exprCall)) {
                 return plan;
             }
         }
 
         // If not, project SingleValueAgg
-        return RelOptUtil.createSingleValueAggRel(cluster, plan);
+        return RelOptUtil.createSingleValueAggRel(
+                cluster,
+                plan);
     }
 
     /**
@@ -1269,35 +1449,69 @@ public class SqlToRelConverter {
      * @param isNotIn    is this a NOT IN operator
      * @return converted expression
      */
-    private RexNode convertInToOr(final Blackboard bb, final List<RexNode> leftKeys, SqlNodeList valuesList, boolean isNotIn) {
+    private RexNode convertInToOr(
+            final Blackboard bb,
+            final List<RexNode> leftKeys,
+            SqlNodeList valuesList,
+            boolean isNotIn) {
         final List<RexNode> comparisons = new ArrayList<>();
         for (SqlNode rightVals : valuesList) {
             RexNode rexComparison;
             if (leftKeys.size() == 1) {
-                rexComparison = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, leftKeys.get(0), rexBuilder.ensureType(leftKeys.get(0).getType(), bb.convertExpression(rightVals), true));
+                rexComparison =
+                        rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                                leftKeys.get(0),
+                                ensureSqlType(leftKeys.get(0).getType(),
+                                        bb.convertExpression(rightVals)));
             } else {
                 assert rightVals instanceof SqlCall;
                 final SqlBasicCall call = (SqlBasicCall) rightVals;
-                assert (call.getOperator() instanceof SqlRowOperator) && call.operandCount() == leftKeys.size();
-                rexComparison = RexUtil.composeConjunction(rexBuilder, Iterables.transform(Pair.zip(leftKeys, call.getOperandList()), new Function<Pair<RexNode, SqlNode>, RexNode>() {
-                    public RexNode apply(Pair<RexNode, SqlNode> pair) {
-                        return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, pair.left, rexBuilder.ensureType(pair.left.getType(), bb.convertExpression(pair.right), true));
-                    }
-                }), false);
+                assert (call.getOperator() instanceof SqlRowOperator)
+                        && call.operandCount() == leftKeys.size();
+                rexComparison =
+                        RexUtil.composeConjunction(
+                                rexBuilder,
+                                Iterables.transform(
+                                        Pair.zip(leftKeys, call.getOperandList()),
+                                        new Function<Pair<RexNode, SqlNode>, RexNode>() {
+                                            public RexNode apply(Pair<RexNode, SqlNode> pair) {
+                                                return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                                                        pair.left,
+                                                        ensureSqlType(pair.left.getType(),
+                                                                bb.convertExpression(pair.right)));
+                                            }
+                                        }),
+                                false);
             }
             comparisons.add(rexComparison);
         }
 
-        RexNode result = RexUtil.composeDisjunction(rexBuilder, comparisons, true);
+        RexNode result =
+                RexUtil.composeDisjunction(rexBuilder, comparisons, true);
         assert result != null;
 
         if (isNotIn) {
-            result = rexBuilder.makeCall(SqlStdOperatorTable.NOT, result);
+            result =
+                    rexBuilder.makeCall(
+                            SqlStdOperatorTable.NOT,
+                            result);
         }
 
         return result;
     }
 
+    /** Ensures that an expression has a given {@link SqlTypeName}, applying a
+     * cast if necessary. If the expression already has the right type family,
+     * returns the expression unchanged. */
+    private RexNode ensureSqlType(RelDataType type, RexNode node) {
+        if (type.getSqlTypeName() == node.getType().getSqlTypeName()
+                || (type.getSqlTypeName() == SqlTypeName.VARCHAR
+                && node.getType().getSqlTypeName() == SqlTypeName.CHAR)) {
+            return node;
+        }
+        return rexBuilder.ensureType(type, node, true);
+    }
+
     /**
      * Gets the list size threshold under which {@link #convertInToOr} is used.
      * Lists of this size or greater will instead be converted to use a join
@@ -1306,15 +1520,17 @@ public class SqlToRelConverter {
      * predicate. A threshold of 0 forces usage of an inline table in all cases; a
      * threshold of Integer.MAX_VALUE forces usage of OR in all cases
      *
-     * @return threshold, default {@link #IN_SUBQUERY_THRESHOLD}
+     * @return threshold, default {@link #DEFAULT_IN_SUB_QUERY_THRESHOLD}
      */
+    @Deprecated // to be removed before 2.0
     protected int getInSubqueryThreshold() {
-        /* OVERRIDE POINT */
+        //return config.getInSubQueryThreshold();
+         /* OVERRIDE POINT */
         return Integer.MAX_VALUE;
     }
 
     /**
-     * Converts an EXISTS or IN predicate into a join. For EXISTS, the subquery
+     * Converts an EXISTS or IN predicate into a join. For EXISTS, the sub-query
      * produces an indicator variable, and the result is a relational expression
      * which outer joins that indicator to the original query. After performing
      * the outer join, the condition will be TRUE if the EXISTS condition holds,
@@ -1322,23 +1538,34 @@ public class SqlToRelConverter {
      *
      * @param seek           A query, for example 'select * from emp' or
      *                       'values (1,2,3)' or '('Foo', 34)'.
-     * @param subqueryType   Whether sub-query is IN, EXISTS or scalar
+     * @param subQueryType   Whether sub-query is IN, EXISTS or scalar
      * @param logic Whether the answer needs to be in full 3-valued logic (TRUE,
      *     FALSE, UNKNOWN) will be required, or whether we can accept an
      *     approximation (say representing UNKNOWN as FALSE)
-     * @param needsOuterJoin Whether an outer join is needed
+     * @param notIn Whether the operation is NOT IN
      * @return join expression
      * @pre extraExpr == null || extraName != null
      */
-    private Pair<RelNode, Boolean> convertExists(SqlNode seek, RelOptUtil.SubqueryType subqueryType, RelOptUtil.Logic logic, boolean needsOuterJoin, RelDataType targetDataType) {
-        final SqlValidatorScope seekScope = (seek instanceof SqlSelect) ? validator.getSelectScope((SqlSelect) seek) : null;
+    private RelOptUtil.Exists convertExists(
+            SqlNode seek,
+            RelOptUtil.SubQueryType subQueryType,
+            RelOptUtil.Logic logic,
+            boolean notIn,
+            RelDataType targetDataType) {
+        final SqlValidatorScope seekScope =
+                (seek instanceof SqlSelect)
+                        ? validator.getSelectScope((SqlSelect) seek)
+                        : null;
         final Blackboard seekBb = createBlackboard(seekScope, null, false);
         RelNode seekRel = convertQueryOrInList(seekBb, seek, targetDataType);
 
-        return RelOptUtil.createExistsPlan(seekRel, subqueryType, logic, needsOuterJoin);
+        return RelOptUtil.createExistsPlan(seekRel, subQueryType, logic, notIn);
     }
 
-    private RelNode convertQueryOrInList(Blackboard bb, SqlNode seek, RelDataType targetRowType) {
+    private RelNode convertQueryOrInList(
+            Blackboard bb,
+            SqlNode seek,
+            RelDataType targetRowType) {
         // NOTE: Once we start accepting single-row queries as row constructors,
         // there will be an ambiguity here for a case like X IN ((SELECT Y FROM
         // Z)).  The SQL standard resolves the ambiguity by saying that a lone
@@ -1346,25 +1573,40 @@ public class SqlToRelConverter {
         // expression.  The semantic difference is that a table expression can
         // return multiple rows.
         if (seek instanceof SqlNodeList) {
-            return convertRowValues(bb, seek, ((SqlNodeList) seek).getList(), false, targetRowType);
+            return convertRowValues(
+                    bb,
+                    seek,
+                    ((SqlNodeList) seek).getList(),
+                    false,
+                    targetRowType);
         } else {
             return convertQueryRecursive(seek, false, null).project();
         }
     }
 
-    private RelNode convertRowValues(Blackboard bb, SqlNode rowList, Collection<SqlNode> rows, boolean allowLiteralsOnly, RelDataType targetRowType) {
+    private RelNode convertRowValues(
+            Blackboard bb,
+            SqlNode rowList,
+            Collection<SqlNode> rows,
+            boolean allowLiteralsOnly,
+            RelDataType targetRowType) {
         // NOTE jvs 30-Apr-2006: We combine all rows consisting entirely of
         // literals into a single LogicalValues; this gives the optimizer a smaller
         // input tree.  For everything else (computed expressions, row
-        // subqueries), we union each row in as a projection on top of a
+        // sub-queries), we union each row in as a projection on top of a
         // LogicalOneRow.
 
-        final ImmutableList.Builder<ImmutableList<RexLiteral>> tupleList = ImmutableList.builder();
+        final ImmutableList.Builder<ImmutableList<RexLiteral>> tupleList =
+                ImmutableList.builder();
         final RelDataType rowType;
         if (targetRowType != null) {
             rowType = targetRowType;
         } else {
-            rowType = SqlTypeUtil.promoteToRowType(typeFactory, validator.getValidatedNodeType(rowList), null);
+            rowType =
+                    SqlTypeUtil.promoteToRowType(
+                            typeFactory,
+                            validator.getValidatedNodeType(rowList),
+                            null);
         }
 
         final List<RelNode> unionInputs = new ArrayList<>();
@@ -1374,11 +1616,16 @@ public class SqlToRelConverter {
                 call = (SqlBasicCall) node;
                 ImmutableList.Builder<RexLiteral> tuple = ImmutableList.builder();
                 for (Ord<SqlNode> operand : Ord.zip(call.operands)) {
-                    RexLiteral rexLiteral = convertLiteralInValuesList(operand.e, bb, rowType, operand.i);
+                    RexLiteral rexLiteral =
+                            convertLiteralInValuesList(
+                                    operand.e,
+                                    bb,
+                                    rowType,
+                                    operand.i);
                     if ((rexLiteral == null) && allowLiteralsOnly) {
                         return null;
                     }
-                    if ((rexLiteral == null) || !shouldCreateValuesRel) {
+                    if ((rexLiteral == null) || !config.isCreateValuesRel()) {
                         // fallback to convertRowConstructor
                         tuple = null;
                         break;
@@ -1390,8 +1637,13 @@ public class SqlToRelConverter {
                     continue;
                 }
             } else {
-                RexLiteral rexLiteral = convertLiteralInValuesList(node, bb, rowType, 0);
-                if ((rexLiteral != null) && shouldCreateValuesRel) {
+                RexLiteral rexLiteral =
+                        convertLiteralInValuesList(
+                                node,
+                                bb,
+                                rowType,
+                                0);
+                if ((rexLiteral != null) && config.isCreateValuesRel()) {
                     tupleList.add(ImmutableList.of(rexLiteral));
                     continue;
                 } else {
@@ -1401,11 +1653,15 @@ public class SqlToRelConverter {
                 }
 
                 // convert "1" to "row(1)"
-                call = (SqlBasicCall) SqlStdOperatorTable.ROW.createCall(SqlParserPos.ZERO, node);
+                call =
+                        (SqlBasicCall) SqlStdOperatorTable.ROW.createCall(
+                                SqlParserPos.ZERO,
+                                node);
             }
             unionInputs.add(convertRowConstructor(bb, call));
         }
-        LogicalValues values = LogicalValues.create(cluster, rowType, tupleList.build());
+        LogicalValues values =
+                LogicalValues.create(cluster, rowType, tupleList.build());
         RelNode resultRel;
         if (unionInputs.isEmpty()) {
             resultRel = values;
@@ -1419,7 +1675,11 @@ public class SqlToRelConverter {
         return resultRel;
     }
 
-    private RexLiteral convertLiteralInValuesList(SqlNode sqlNode, Blackboard bb, RelDataType rowType, int iField) {
+    private RexLiteral convertLiteralInValuesList(
+            SqlNode sqlNode,
+            Blackboard bb,
+            RelDataType rowType,
+            int iField) {
         if (!(sqlNode instanceof SqlLiteral)) {
             return null;
         }
@@ -1432,7 +1692,10 @@ public class SqlToRelConverter {
             return null;
         }
 
-        RexNode literalExpr = exprConverter.convertLiteral(bb, (SqlLiteral) sqlNode);
+        RexNode literalExpr =
+                exprConverter.convertLiteral(
+                        bb,
+                        (SqlLiteral) sqlNode);
 
         if (!(literalExpr instanceof RexLiteral)) {
             assert literalExpr.isA(SqlKind.CAST);
@@ -1449,14 +1712,24 @@ public class SqlToRelConverter {
         Comparable value = literal.getValue();
 
         if (SqlTypeUtil.isExactNumeric(type) && SqlTypeUtil.hasScale(type)) {
-            BigDecimal roundedValue = NumberUtil.rescaleBigDecimal((BigDecimal) value, type.getScale());
-            return rexBuilder.makeExactLiteral(roundedValue, type);
+            BigDecimal roundedValue =
+                    NumberUtil.rescaleBigDecimal(
+                            (BigDecimal) value,
+                            type.getScale());
+            return rexBuilder.makeExactLiteral(
+                    roundedValue,
+                    type);
         }
 
-        if ((value instanceof NlsString) && (type.getSqlTypeName() == SqlTypeName.CHAR)) {
+        if ((value instanceof NlsString)
+                && (type.getSqlTypeName() == SqlTypeName.CHAR)) {
             // pad fixed character type
             NlsString unpadded = (NlsString) value;
-            return rexBuilder.makeCharLiteral(new NlsString(Spaces.padRight(unpadded.getValue(), type.getPrecision()), unpadded.getCharsetName(), unpadded.getCollation()));
+            return rexBuilder.makeCharLiteral(
+                    new NlsString(
+                            Spaces.padRight(unpadded.getValue(), type.getPrecision()),
+                            unpadded.getCharsetName(),
+                            unpadded.getCollation()));
         }
         return literal;
     }
@@ -1478,67 +1751,78 @@ public class SqlToRelConverter {
      * @param logic Whether the answer needs to be in full 3-valued logic (TRUE,
      *              FALSE, UNKNOWN) will be required, or whether we can accept
      *              an approximation (say representing UNKNOWN as FALSE)
-     * @param registerOnlyScalarSubqueries if set to true and the parse tree
+     * @param registerOnlyScalarSubQueries if set to true and the parse tree
      *                                     corresponds to a variation of a select
      *                                     node, only register it if it's a scalar
-     *                                     subquery
+     *                                     sub-query
      */
-    private void findSubqueries(Blackboard bb, SqlNode node, RelOptUtil.Logic logic, boolean registerOnlyScalarSubqueries) {
+    private void findSubQueries(
+            Blackboard bb,
+            SqlNode node,
+            RelOptUtil.Logic logic,
+            boolean registerOnlyScalarSubQueries) {
         final SqlKind kind = node.getKind();
         switch (kind) {
-        case EXISTS:
-        case SELECT:
-        case MULTISET_QUERY_CONSTRUCTOR:
-        case MULTISET_VALUE_CONSTRUCTOR:
-        case ARRAY_QUERY_CONSTRUCTOR:
-        case CURSOR:
-        case SCALAR_QUERY:
-            if (!registerOnlyScalarSubqueries || (kind == SqlKind.SCALAR_QUERY)) {
-                bb.registerSubquery(node, RelOptUtil.Logic.TRUE_FALSE);
-            }
-            return;
-        case IN:
-            if (((SqlCall) node).getOperator() == SqlStdOperatorTable.NOT_IN) {
+            case EXISTS:
+            case SELECT:
+            case MULTISET_QUERY_CONSTRUCTOR:
+            case MULTISET_VALUE_CONSTRUCTOR:
+            case ARRAY_QUERY_CONSTRUCTOR:
+            case CURSOR:
+            case SCALAR_QUERY:
+                if (!registerOnlyScalarSubQueries
+                        || (kind == SqlKind.SCALAR_QUERY)) {
+                    bb.registerSubQuery(node, RelOptUtil.Logic.TRUE_FALSE);
+                }
+                return;
+            case IN:
+                if (((SqlCall) node).getOperator() == SqlStdOperatorTable.NOT_IN) {
+                    logic = logic.negate();
+                }
+                break;
+            case NOT:
                 logic = logic.negate();
-            }
-            break;
-        case NOT:
-            logic = logic.negate();
-            break;
+                break;
         }
         if (node instanceof SqlCall) {
-            if (kind == SqlKind.OR || kind == SqlKind.NOT) {
-                // It's always correct to outer join subquery with
-                // containing query; however, when predicates involve Or
-                // or NOT, outer join might be necessary.
-                bb.subqueryNeedsOuterJoin = true;
-            }
             for (SqlNode operand : ((SqlCall) node).getOperandList()) {
                 if (operand != null) {
                     // In the case of an IN expression, locate scalar
-                    // subqueries so we can convert them to constants
-                    findSubqueries(bb, operand, logic, kind == SqlKind.IN || registerOnlyScalarSubqueries);
+                    // sub-queries so we can convert them to constants
+                    findSubQueries(
+                            bb,
+                            operand,
+                            logic,
+                            kind == SqlKind.IN || registerOnlyScalarSubQueries);
                 }
             }
         } else if (node instanceof SqlNodeList) {
             for (SqlNode child : (SqlNodeList) node) {
-                findSubqueries(bb, child, logic, kind == SqlKind.IN || registerOnlyScalarSubqueries);
+                findSubQueries(
+                        bb,
+                        child,
+                        logic,
+                        kind == SqlKind.IN || registerOnlyScalarSubQueries);
             }
         }
 
-        // Now that we've located any scalar subqueries inside the IN
+        // Now that we've located any scalar sub-queries inside the IN
         // expression, register the IN expression itself.  We need to
-        // register the scalar subqueries first so they can be converted
+        // register the scalar sub-queries first so they can be converted
         // before the IN expression is converted.
         if (kind == SqlKind.IN) {
-            if (logic == RelOptUtil.Logic.TRUE_FALSE_UNKNOWN && !validator.getValidatedNodeType(node).isNullable()) {
-                logic = RelOptUtil.Logic.UNKNOWN_AS_FALSE;
-            }
-            // TODO: This conversion is only valid in the WHERE clause
-            if (logic == RelOptUtil.Logic.UNKNOWN_AS_FALSE && !bb.subqueryNeedsOuterJoin) {
-                logic = RelOptUtil.Logic.TRUE;
+            switch (logic) {
+                case TRUE_FALSE_UNKNOWN:
+                    if (validator.getValidatedNodeType(node).isNullable()) {
+                        break;
+                    } else if (true) {
+                        break;
+                    }
+                    // fall through
+                case UNKNOWN_AS_FALSE:
+                    logic = RelOptUtil.Logic.TRUE;
             }
-            bb.registerSubquery(node, logic);
+            bb.registerSubQuery(node, logic);
         }
     }
 
@@ -1548,9 +1832,11 @@ public class SqlToRelConverter {
      * @param node Expression to translate
      * @return Converted expression
      */
-    public RexNode convertEx

<TRUNCATED>