You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/07/25 12:15:33 UTC
[14/18] kylin git commit: merge 1.5.3
merge 1.5.3
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/90a0c5ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/90a0c5ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/90a0c5ba
Branch: refs/heads/1.5.x-HBase1.x
Commit: 90a0c5baba0c0d8e7c1f84ea4b94f0886fa8c19e
Parents: 1b6ed08
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jul 25 20:12:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jul 25 20:13:38 2016 +0800
----------------------------------------------------------------------
.../kylin/provision/BuildCubeWithEngine.java | 3 +-
.../rest/security/MockAclHBaseStorage.java | 4 +-
.../apache/kylin/rest/service/CubeService.java | 2 +
.../apache/kylin/rest/service/QueryService.java | 7 +-
.../apache/kylin/rest/service/UserService.java | 14 +---
.../kylin/storage/hbase/HBaseConnection.java | 74 ++++++++++++++++----
.../kylin/storage/hbase/HBaseResourceStore.java | 23 ++----
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +-
.../storage/hbase/steps/HBaseCuboidWriter.java | 1 -
.../storage/hbase/util/CubeMigrationCLI.java | 22 ++----
.../storage/hbase/util/StorageCleanupJob.java | 5 +-
11 files changed, 85 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 4cee1ed..2a5979f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -58,6 +58,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -419,7 +420,7 @@ public class BuildCubeWithEngine {
}
private void checkHFilesInHBase(CubeSegment segment) throws IOException {
- Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
String tableName = segment.getStorageLocationIdentifier();
HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index 492c176..16d6f9f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -34,8 +34,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
private static final String aclTableName = "MOCK-ACL-TABLE";
private static final String userTableName = "MOCK-USER-TABLE";
- private HTableInterface mockedAclTable;
- private HTableInterface mockedUserTable;
+ private Table mockedAclTable;
+ private Table mockedUserTable;
private RealAclHBaseStorage realAcl;
public MockAclHBaseStorage() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 00b07d5..0503236 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.WeakHashMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import java.util.*;
import org.apache.kylin.common.KylinConfig;
@@ -65,6 +66,7 @@ import org.apache.kylin.rest.security.AclPermission;
import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 783616d..d095f2b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -46,7 +46,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -184,14 +183,10 @@ public class QueryService extends BasicService {
List<Query> queries = new ArrayList<Query>();
Table htable = null;
try {
-<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
- HConnection conn = HBaseConnection.get(hbaseUrl);
+ org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl);
HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
- htable = conn.getTable(userTableName);
-=======
htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/QueryService.java
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
index 64c2c7d..e039534 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -146,17 +146,12 @@ public class UserService implements UserDetailsManager {
public void updateUser(UserDetails user) {
Table htable = null;
try {
- byte[] userAuthorities = serialize(user.getAuthorities());
htable = aclHBaseStorage.getTable(userTableName);
-<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
Pair<byte[], byte[]> pair = userToHBaseRow(user);
Put put = new Put(pair.getKey());
- put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
-=======
- Put put = new Put(Bytes.toBytes(user.getUsername()));
- put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities);
->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java
+
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
htable.put(put);
} catch (IOException e) {
@@ -219,13 +214,8 @@ public class UserService implements UserDetailsManager {
Scan s = new Scan();
s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
-<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
List<UserDetails> all = new ArrayList<UserDetails>();
- HTableInterface htable = null;
-=======
- List<String> authorities = new ArrayList<String>();
Table htable = null;
->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java
ResultScanner scanner = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 05170a0..e7ee2f5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -21,7 +21,7 @@ package org.apache.kylin.storage.hbase;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.engine.mr.HadoopUtil;
@@ -51,14 +52,20 @@ public class HBaseConnection {
private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
- private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
- private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
+ private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>();
+ private static final Map<String, Connection> connPool = new ConcurrentHashMap<String, Connection>();
+
+ private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>();
+
+ private static ExecutorService coprocessorPool = null;
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- for (Connection conn : ConnPool.values()) {
+ closeCoprocessorPool();
+
+ for (Connection conn : connPool.values()) {
try {
conn.close();
} catch (IOException e) {
@@ -68,19 +75,62 @@ public class HBaseConnection {
}
});
}
+
+ public static ExecutorService getCoprocessorPool() {
+ if (coprocessorPool != null) {
+ return coprocessorPool;
+ }
+
+ synchronized (HBaseConnection.class) {
+ if (coprocessorPool != null) {
+ return coprocessorPool;
+ }
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ // copy from HConnectionImplementation.getBatchPool()
+ int maxThreads = config.getHBaseMaxConnectionThreads();
+ int coreThreads = config.getHBaseCoreConnectionThreads();
+ long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
+ LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
+ Threads.newDaemonThreadFactory("kylin-coproc-"));
+ tpe.allowCoreThreadTimeOut(true);
+
+ logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);
+
+ coprocessorPool = tpe;
+ return coprocessorPool;
+ }
+ }
+
+ private static void closeCoprocessorPool() {
+ if (coprocessorPool == null)
+ return;
+
+ coprocessorPool.shutdown();
+ try {
+ if (!coprocessorPool.awaitTermination(10, TimeUnit.SECONDS)) {
+ coprocessorPool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ coprocessorPool.shutdownNow();
+ }
+ }
+
public static void clearConnCache() {
- ConnPool.clear();
+ connPool.clear();
}
private static final ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
public static Configuration getCurrentHBaseConfiguration() {
- if (hbaseConfig.get() == null) {
+ if (configThreadLocal.get() == null) {
String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
- hbaseConfig.set(newHBaseConfiguration(storageUrl));
+ configThreadLocal.set(newHBaseConfiguration(storageUrl));
}
- return hbaseConfig.get();
+ return configThreadLocal.get();
}
private static Configuration newHBaseConfiguration(String url) {
@@ -128,20 +178,20 @@ public class HBaseConnection {
@SuppressWarnings("resource")
public static Connection get(String url) {
// find configuration
- Configuration conf = ConfigCache.get(url);
+ Configuration conf = configCache.get(url);
if (conf == null) {
conf = newHBaseConfiguration(url);
- ConfigCache.put(url, conf);
+ configCache.put(url, conf);
}
- Connection connection = ConnPool.get(url);
+ Connection connection = connPool.get(url);
try {
while (true) {
// I don't use DCL since recreate a connection is not a big issue.
if (connection == null || connection.isClosed()) {
logger.info("connection is null or closed, creating a new one");
connection = ConnectionFactory.createConnection(conf);
- ConnPool.put(url, connection);
+ connPool.put(url, connection);
}
if (connection == null || connection.isClosed()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index f988dea..aa7a4d4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -32,14 +32,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
@@ -286,7 +281,6 @@ public class HBaseResourceStore extends ResourceStore {
Delete del = new Delete(Bytes.toBytes(resPath));
table.delete(del);
- table.flushCommits();
if (hdfsResourceExist) { // remove hdfs cell value
Path redirectPath = bigCellHDFSPath(resPath);
@@ -308,7 +302,7 @@ public class HBaseResourceStore extends ResourceStore {
}
private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
return internalGetFromHTable(table, path, fetchContent, fetchTimestamp);
} finally {
@@ -316,7 +310,7 @@ public class HBaseResourceStore extends ResourceStore {
}
}
- private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
+ private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
byte[] rowkey = Bytes.toBytes(path);
Get get = new Get(rowkey);
@@ -330,14 +324,9 @@ public class HBaseResourceStore extends ResourceStore {
get.addColumn(B_FAMILY, B_COLUMN_TS);
}
- Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
- try {
- Result result = table.get(get);
- boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
- return exists ? result : null;
- } finally {
- IOUtils.closeQuietly(table);
- }
+ Result result = table.get(get);
+ boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
+ return exists ? result : null;
}
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 830aca7..d84074f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -360,7 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final boolean[] abnormalFinish = new boolean[1];
try {
- HTableInterface table = conn.get(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
+ Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
final CubeVisitRequest request = builder.build();
final byte[] startKey = epRange.getFirst();
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index 16955dd..c990379 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -136,7 +136,6 @@ public class HBaseCuboidWriter implements ICuboidWriter {
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
- logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 6b63e66..dfb7c78 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -27,16 +27,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.RawResource;
@@ -45,7 +37,6 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -463,11 +454,7 @@ public class CubeMigrationCLI {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
destAclHtable.put(put);
}
- Put put = new Put(Bytes.toBytes(cubeId));
- put.add(family, column, value);
- destAclHtable.put(put);
}
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(srcAclHtable);
IOUtils.closeQuietly(destAclHtable);
@@ -533,13 +520,12 @@ public class CubeMigrationCLI {
case COPY_ACL: {
String cubeId = (String) opt.params[0];
String modelId = (String) opt.params[1];
- HTableInterface destAclHtable = null;
+ Table destAclHtable = null;
try {
- destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
+ destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(destAclHtable);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/90a0c5ba/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index d7f49df..874121d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.CliCommandExecutor;
@@ -167,10 +168,10 @@ public class StorageCleanupJob extends AbstractHadoopJob {
}
class DeleteHTableRunnable implements Callable {
- HBaseAdmin hbaseAdmin;
+ Admin hbaseAdmin;
String htableName;
- DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
+ DeleteHTableRunnable(Admin hbaseAdmin, String htableName) {
this.hbaseAdmin = hbaseAdmin;
this.htableName = htableName;
}