You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/17 10:57:16 UTC

[kylin] 05/15: KYLIN-3517 Upadate coprocessor on HBase2.0 is avaliable.

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.0-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7dbd6281913c7c9701a76fe4e84a4da54d7f5ffe
Author: Lijun Cao <>
AuthorDate: Tue Sep 4 09:56:36 2018 +0800

    KYLIN-3517 Upadate coprocessor on HBase2.0 is avaliable.
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../hbase/lookup/LookupTableToHFileJob.java        | 24 +++++------
 .../kylin/storage/hbase/steps/CubeHTableUtil.java  | 46 +++++++++++-----------
 .../storage/hbase/util/DeployCoprocessorCLI.java   | 46 +++++++++++-----------
 3 files changed, 60 insertions(+), 56 deletions(-)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
index 054e146..2789401 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -26,12 +26,12 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -207,24 +207,24 @@ public class LookupTableToHFileJob extends AbstractHadoopJob {
         String hTableName = genHTableName(kylinConfig, admin, sourceTableName);
 
         TableName tableName = TableName.valueOf(hTableName);
-        HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
-        hTableDesc.setCompactionEnabled(false);
-        hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-        hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-        hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+        TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
+        descBuilder.setCompactionEnabled(false);
+        descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+        descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+        descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
         String commitInfo = KylinVersion.getGitCommitInfo();
         if (!StringUtils.isEmpty(commitInfo)) {
-            hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+            descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
         }
 
-        HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
-        hTableDesc.addFamily(cf);
+        ColumnFamilyDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
+        descBuilder.modifyColumnFamily(cf);
 
         try {
             if (shardNum > 1) {
-                admin.createTable(hTableDesc, getSplitsByShardNum(shardNum));
+                admin.createTable(descBuilder.build(), getSplitsByShardNum(shardNum));
             } else {
-                admin.createTable(hTableDesc);
+                admin.createTable(descBuilder.build());
             }
         } finally {
             IOUtils.closeQuietly(admin);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index f006adb..9e3703c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -59,25 +61,25 @@ public class CubeHTableUtil {
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         KylinConfig kylinConfig = cubeDesc.getConfig();
 
-        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(cubeSegment.getStorageLocationIdentifier()));
-        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-        tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+        TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(cubeSegment.getStorageLocationIdentifier()));
+        descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+        descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+        descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
 
         if (!StringUtils.isEmpty(kylinConfig.getKylinOwner())) {
             //HTableOwner is the team that provides kylin service
-            tableDesc.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner());
+            descBuilder.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner());
         }
 
         String commitInfo = KylinVersion.getGitCommitInfo();
         if (!StringUtils.isEmpty(commitInfo)) {
-            tableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+            descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
         }
 
         //HTableUser is the cube owner, which will be the "user"
-        tableDesc.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner());
+        descBuilder.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner());
 
-        tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
+        descBuilder.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
@@ -86,12 +88,12 @@ public class CubeHTableUtil {
         try {
             if (User.isHBaseSecurityEnabled(conf)) {
                 // add coprocessor for bulk load
-                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+                descBuilder.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
             }
 
             for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-                HColumnDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry());
-                tableDesc.addFamily(cf);
+                ColumnFamilyDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry());
+                descBuilder.setColumnFamily(cf);
             }
 
             if (admin.tableExists(TableName.valueOf(tableName))) {
@@ -100,9 +102,9 @@ public class CubeHTableUtil {
                 throw new RuntimeException("HBase table " + tableName + " exists!");
             }
 
-            DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+            DeployCoprocessorCLI.deployCoprocessor(descBuilder);
 
-            admin.createTable(tableDesc, splitKeys);
+            admin.createTable(descBuilder.build(), splitKeys);
             Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } finally {
@@ -136,14 +138,14 @@ public class CubeHTableUtil {
                 admin.deleteTable(tableName);
             }
 
-            HTableDescriptor tableDesc = new HTableDescriptor(tableName);
-            tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+            TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
+            descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
 
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            tableDesc.addFamily(createColumnFamily(kylinConfig, cfName, false));
+            descBuilder.modifyColumnFamily(createColumnFamily(kylinConfig, cfName, false));
 
             logger.info("creating hbase table " + tableName);
-            admin.createTable(tableDesc, null);
+            admin.createTable(descBuilder.build(), null);
             Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } finally {
@@ -151,8 +153,8 @@ public class CubeHTableUtil {
         }
     }
 
-    public static HColumnDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) {
-        HColumnDescriptor cf = new HColumnDescriptor(cfName);
+    public static ColumnFamilyDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) {
+        ColumnFamilyDescriptorBuilder cf = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cfName));
         cf.setMaxVersions(1);
 
         if (isMemoryHungry) {
@@ -203,7 +205,7 @@ public class CubeHTableUtil {
         cf.setInMemory(false);
         cf.setBloomFilterType(BloomType.NONE);
         cf.setScope(kylinConfig.getHBaseReplicationScope());
-        return cf;
+        return cf.build();
     }
 
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 46363b2..362a105 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -41,11 +41,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
@@ -178,7 +179,7 @@ public class DeployCoprocessorCLI {
         }
         logger.info("Commit Information: " + commitInfo);
         for (String tableName : tableNames) {
-            HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            TableDescriptor tableDesc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
             String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag);
             if (commitInfo.equals(gitTag)) {
                 filteredList.add(tableName);
@@ -249,18 +250,18 @@ public class DeployCoprocessorCLI {
         return result;
     }
 
-    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+    public static void deployCoprocessor(TableDescriptorBuilder desBuilder) {
         try {
-            initHTableCoprocessor(tableDesc);
-            logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+            initHTableCoprocessor(desBuilder);
+            logger.info("hbase table " + desBuilder.build().getTableName() + " deployed with coprocessor.");
 
         } catch (Exception ex) {
-            logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+            logger.error("Error deploying coprocessor on " + desBuilder.build().getTableName(), ex);
             logger.error("Will try creating the table without coprocessor.");
         }
     }
 
-    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+    private static void initHTableCoprocessor(TableDescriptorBuilder descBuilder) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
@@ -268,17 +269,18 @@ public class DeployCoprocessorCLI {
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
         Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
 
-        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        DeployCoprocessorCLI.addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar);
     }
 
-    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Add coprocessor on " + desc.getNameAsString());
-        desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+    public static void addCoprocessorOnHTable(TableDescriptorBuilder descBuilder, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + descBuilder.build().getTableName().toString());
+        descBuilder.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
     }
 
     public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+        TableDescriptor desc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
+        TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(desc);
 
         //when the table has migrated from dev env to test(prod) env, the dev server
         //should not reset the coprocessor of the table.
@@ -294,30 +296,30 @@ public class DeployCoprocessorCLI {
         hbaseAdmin.disableTable(TableName.valueOf(tableName));
 
         while (desc.hasCoprocessor(CubeObserverClassOld2)) {
-            desc.removeCoprocessor(CubeObserverClassOld2);
+            desc = descBuilder.removeCoprocessor(CubeObserverClassOld2).build();
         }
         while (desc.hasCoprocessor(CubeEndpointClass)) {
-            desc.removeCoprocessor(CubeEndpointClass);
+            desc = descBuilder.removeCoprocessor(CubeEndpointClass).build();
         }
         while (desc.hasCoprocessor(IIEndpointClass)) {
-            desc.removeCoprocessor(IIEndpointClass);
+            desc = descBuilder.removeCoprocessor(IIEndpointClass).build();
         }
         // remove legacy coprocessor from v1.x
         while (desc.hasCoprocessor(CubeObserverClassOld)) {
-            desc.removeCoprocessor(CubeObserverClassOld);
+            desc = descBuilder.removeCoprocessor(CubeObserverClassOld).build();
         }
         while (desc.hasCoprocessor(IIEndpointClassOld)) {
-            desc.removeCoprocessor(IIEndpointClassOld);
+            desc = descBuilder.removeCoprocessor(IIEndpointClassOld).build();
         }
-        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar);
 
         // update commit tags
         String commitInfo = KylinVersion.getGitCommitInfo();
         if (!StringUtils.isEmpty(commitInfo)) {
-            desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+            descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
         }
 
-        hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+        hbaseAdmin.modifyTable(descBuilder.build());
 
         logger.info("Enable " + tableName);
         hbaseAdmin.enableTable(TableName.valueOf(tableName));
@@ -490,9 +492,9 @@ public class DeployCoprocessorCLI {
         HashSet<String> result = new HashSet<String>();
 
         for (String tableName : tableNames) {
-            HTableDescriptor tableDescriptor = null;
+            TableDescriptor tableDescriptor = null;
             try {
-                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+                tableDescriptor = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
             } catch (TableNotFoundException e) {
                 logger.warn("Table not found " + tableName, e);
                 continue;