You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/11 01:15:17 UTC
[kylin] 01/01: KYLIN-3517 Upadate coprocessor on HBase2.0 is
avaliable.
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master-hadoop3.1-2.5.0
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 96026886774aff16f03e6d5424e9c0ccdfca1ca0
Author: Lijun Cao <>
AuthorDate: Tue Sep 4 09:56:36 2018 +0800
KYLIN-3517 Upadate coprocessor on HBase2.0 is avaliable.
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../hbase/lookup/LookupTableToHFileJob.java | 24 +++++------
.../kylin/storage/hbase/steps/CubeHTableUtil.java | 46 +++++++++++-----------
.../storage/hbase/util/DeployCoprocessorCLI.java | 46 +++++++++++-----------
3 files changed, 60 insertions(+), 56 deletions(-)
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
index 054e146..2789401 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -26,12 +26,12 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -207,24 +207,24 @@ public class LookupTableToHFileJob extends AbstractHadoopJob {
String hTableName = genHTableName(kylinConfig, admin, sourceTableName);
TableName tableName = TableName.valueOf(hTableName);
- HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
- hTableDesc.setCompactionEnabled(false);
- hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
- hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
- hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ descBuilder.setCompactionEnabled(false);
+ descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+ descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+ descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
String commitInfo = KylinVersion.getGitCommitInfo();
if (!StringUtils.isEmpty(commitInfo)) {
- hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+ descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
}
- HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
- hTableDesc.addFamily(cf);
+ ColumnFamilyDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
+ descBuilder.modifyColumnFamily(cf);
try {
if (shardNum > 1) {
- admin.createTable(hTableDesc, getSplitsByShardNum(shardNum));
+ admin.createTable(descBuilder.build(), getSplitsByShardNum(shardNum));
} else {
- admin.createTable(hTableDesc);
+ admin.createTable(descBuilder.build());
}
} finally {
IOUtils.closeQuietly(admin);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index f006adb..9e3703c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -23,11 +23,12 @@ import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.security.User;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
@@ -59,25 +61,25 @@ public class CubeHTableUtil {
CubeDesc cubeDesc = cubeInstance.getDescriptor();
KylinConfig kylinConfig = cubeDesc.getConfig();
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(cubeSegment.getStorageLocationIdentifier()));
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
- tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
- tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(cubeSegment.getStorageLocationIdentifier()));
+ descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+ descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+ descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
if (!StringUtils.isEmpty(kylinConfig.getKylinOwner())) {
//HTableOwner is the team that provides kylin service
- tableDesc.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner());
+ descBuilder.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner());
}
String commitInfo = KylinVersion.getGitCommitInfo();
if (!StringUtils.isEmpty(commitInfo)) {
- tableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+ descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
}
//HTableUser is the cube owner, which will be the "user"
- tableDesc.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner());
+ descBuilder.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner());
- tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
+ descBuilder.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString());
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
@@ -86,12 +88,12 @@ public class CubeHTableUtil {
try {
if (User.isHBaseSecurityEnabled(conf)) {
// add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ descBuilder.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
}
for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
- HColumnDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry());
- tableDesc.addFamily(cf);
+ ColumnFamilyDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry());
+ descBuilder.setColumnFamily(cf);
}
if (admin.tableExists(TableName.valueOf(tableName))) {
@@ -100,9 +102,9 @@ public class CubeHTableUtil {
throw new RuntimeException("HBase table " + tableName + " exists!");
}
- DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+ DeployCoprocessorCLI.deployCoprocessor(descBuilder);
- admin.createTable(tableDesc, splitKeys);
+ admin.createTable(descBuilder.build(), splitKeys);
Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
logger.info("create hbase table " + tableName + " done.");
} finally {
@@ -136,14 +138,14 @@ public class CubeHTableUtil {
admin.deleteTable(tableName);
}
- HTableDescriptor tableDesc = new HTableDescriptor(tableName);
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- tableDesc.addFamily(createColumnFamily(kylinConfig, cfName, false));
+ descBuilder.modifyColumnFamily(createColumnFamily(kylinConfig, cfName, false));
logger.info("creating hbase table " + tableName);
- admin.createTable(tableDesc, null);
+ admin.createTable(descBuilder.build(), null);
Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
logger.info("create hbase table " + tableName + " done.");
} finally {
@@ -151,8 +153,8 @@ public class CubeHTableUtil {
}
}
- public static HColumnDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) {
- HColumnDescriptor cf = new HColumnDescriptor(cfName);
+ public static ColumnFamilyDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) {
+ ColumnFamilyDescriptorBuilder cf = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cfName));
cf.setMaxVersions(1);
if (isMemoryHungry) {
@@ -203,7 +205,7 @@ public class CubeHTableUtil {
cf.setInMemory(false);
cf.setBloomFilterType(BloomType.NONE);
cf.setScope(kylinConfig.getHBaseReplicationScope());
- return cf;
+ return cf.build();
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 46363b2..362a105 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -41,11 +41,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
@@ -178,7 +179,7 @@ public class DeployCoprocessorCLI {
}
logger.info("Commit Information: " + commitInfo);
for (String tableName : tableNames) {
- HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ TableDescriptor tableDesc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag);
if (commitInfo.equals(gitTag)) {
filteredList.add(tableName);
@@ -249,18 +250,18 @@ public class DeployCoprocessorCLI {
return result;
}
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ public static void deployCoprocessor(TableDescriptorBuilder desBuilder) {
try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+ initHTableCoprocessor(desBuilder);
+ logger.info("hbase table " + desBuilder.build().getTableName() + " deployed with coprocessor.");
} catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+ logger.error("Error deploying coprocessor on " + desBuilder.build().getTableName(), ex);
logger.error("Will try creating the table without coprocessor.");
}
}
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ private static void initHTableCoprocessor(TableDescriptorBuilder descBuilder) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
@@ -268,17 +269,18 @@ public class DeployCoprocessorCLI {
String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
- DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ DeployCoprocessorCLI.addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar);
}
- public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+ public static void addCoprocessorOnHTable(TableDescriptorBuilder descBuilder, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + descBuilder.build().getTableName().toString());
+ descBuilder.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
}
public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ TableDescriptor desc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
+ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(desc);
//when the table has migrated from dev env to test(prod) env, the dev server
//should not reset the coprocessor of the table.
@@ -294,30 +296,30 @@ public class DeployCoprocessorCLI {
hbaseAdmin.disableTable(TableName.valueOf(tableName));
while (desc.hasCoprocessor(CubeObserverClassOld2)) {
- desc.removeCoprocessor(CubeObserverClassOld2);
+ desc = descBuilder.removeCoprocessor(CubeObserverClassOld2).build();
}
while (desc.hasCoprocessor(CubeEndpointClass)) {
- desc.removeCoprocessor(CubeEndpointClass);
+ desc = descBuilder.removeCoprocessor(CubeEndpointClass).build();
}
while (desc.hasCoprocessor(IIEndpointClass)) {
- desc.removeCoprocessor(IIEndpointClass);
+ desc = descBuilder.removeCoprocessor(IIEndpointClass).build();
}
// remove legacy coprocessor from v1.x
while (desc.hasCoprocessor(CubeObserverClassOld)) {
- desc.removeCoprocessor(CubeObserverClassOld);
+ desc = descBuilder.removeCoprocessor(CubeObserverClassOld).build();
}
while (desc.hasCoprocessor(IIEndpointClassOld)) {
- desc.removeCoprocessor(IIEndpointClassOld);
+ desc = descBuilder.removeCoprocessor(IIEndpointClassOld).build();
}
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar);
// update commit tags
String commitInfo = KylinVersion.getGitCommitInfo();
if (!StringUtils.isEmpty(commitInfo)) {
- desc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+ descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo);
}
- hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.modifyTable(descBuilder.build());
logger.info("Enable " + tableName);
hbaseAdmin.enableTable(TableName.valueOf(tableName));
@@ -490,9 +492,9 @@ public class DeployCoprocessorCLI {
HashSet<String> result = new HashSet<String>();
for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
+ TableDescriptor tableDescriptor = null;
try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ tableDescriptor = hbaseAdmin.getDescriptor(TableName.valueOf(tableName));
} catch (TableNotFoundException e) {
logger.warn("Table not found " + tableName, e);
continue;