You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/02 10:16:14 UTC
[3/3] kylin git commit: KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to
HBase 1.1 (with help from murkrishn )
KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to HBase 1.1 (with help from murkrishn <mu...@ebay.com>)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4313900e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4313900e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4313900e
Branch: refs/heads/1.x-HBase1.1.3
Commit: 4313900e67076c6d517d29bc0b78095e0e37b4ad
Parents: f6bc652
Author: Yang Li <li...@apache.org>
Authored: Sun Aug 16 20:22:13 2015 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 2 17:15:28 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 5 +
.../common/persistence/HBaseConnection.java | 31 +-
.../common/persistence/HBaseResourceStore.java | 31 +-
.../common/util/HBaseRegionSizeCalculator.java | 41 +-
.../kylin/common/util/BasicHadoopTest.java | 11 +-
.../kylin/job/cube/GarbageCollectionStep.java | 22 +-
.../kylin/job/hadoop/cube/CubeHFileJob.java | 22 +-
.../job/hadoop/cube/StorageCleanupJob.java | 26 +-
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 8 +-
.../hadoop/invertedindex/IICreateHFileJob.java | 22 +-
.../hadoop/invertedindex/IICreateHTableJob.java | 11 +-
.../apache/kylin/job/tools/CleanHtableCLI.java | 8 +-
.../kylin/job/tools/CubeMigrationCLI.java | 67 +-
.../kylin/job/tools/DeployCoprocessorCLI.java | 769 ++++++++++---------
.../job/tools/GridTableHBaseBenchmark.java | 37 +-
.../kylin/job/tools/HtableAlterMetadataCLI.java | 8 +-
.../apache/kylin/job/tools/RowCounterCLI.java | 11 +-
.../kylin/job/BuildCubeWithEngineTest.java | 3 +-
.../org/apache/kylin/job/ExportHBaseData.java | 18 +-
.../kylin/job/hadoop/hbase/TestHbaseClient.java | 13 +-
.../kylin/job/tools/HBaseRowDigestTest.java | 11 +-
monitor/pom.xml | 6 +
.../kylin/monitor/MonitorMetaManager.java | 49 +-
pom.xml | 20 +-
.../apache/kylin/rest/service/AclService.java | 38 +-
.../apache/kylin/rest/service/CubeService.java | 35 +-
.../apache/kylin/rest/service/QueryService.java | 21 +-
.../apache/kylin/rest/service/UserService.java | 27 +-
.../storage/filter/BitMapFilterEvaluator.java | 1 -
.../storage/hbase/CubeSegmentTupleIterator.java | 50 +-
.../kylin/storage/hbase/CubeStorageEngine.java | 4 +-
.../storage/hbase/HBaseClientKVIterator.java | 187 ++---
.../hbase/InvertedIndexStorageEngine.java | 114 +--
.../kylin/storage/hbase/PingHBaseCLI.java | 179 ++---
.../storage/hbase/RegionScannerAdapter.java | 10 +-
.../hbase/SerializedHBaseTupleIterator.java | 4 +-
.../endpoint/EndpointTupleIterator.java | 15 +-
.../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +-
.../observer/AggregateRegionObserver.java | 2 +-
.../observer/AggregationScanner.java | 14 +-
.../observer/ObserverAggregationCache.java | 10 +-
.../coprocessor/observer/ObserverEnabler.java | 4 +-
.../storage/hbase/filter/FuzzyRowFilter.java | 636 +++++++++++++++
.../storage/hbase/filter/UnsafeAccess.java | 96 +++
.../storage/hbase/InvertedIndexHBaseTest.java | 227 +++---
.../observer/AggregateRegionObserverTest.java | 72 +-
.../minicluster/HiveMiniClusterTest.java | 3 +-
47 files changed, 1877 insertions(+), 1124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index c58d419..c5f4bfe 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -530,6 +530,11 @@ public class KylinConfig {
public int getHBaseScanMaxResultSize() {
return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
}
+
+ public String getPatchedFuzzyRowFilterVersion()
+ {
+ return this.getOptional("kylin.hbase.filter.fuzzy.row.filter.version","1.1.3");
+ }
public boolean isQueryIgnoreUnknownFunction() {
return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false"));
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index 5b8fe54..a3d8166 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,13 +43,13 @@ public class HBaseConnection {
private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
- private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
+ private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- for (HConnection conn : ConnPool.values()) {
+ for (Connection conn : ConnPool.values()) {
try {
conn.close();
} catch (IOException e) {
@@ -62,16 +63,20 @@ public class HBaseConnection {
public static void clearCache() {
ConnPool.clear();
}
+
+ public static Connection get() {
+ return get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ }
- public static HConnection get(String url) {
+ public static Connection get(String url) {
- HConnection connection = ConnPool.get(url);
+ Connection connection = ConnPool.get(url);
try {
// I don't use DCL since recreate a connection is not a big issue.
if (connection == null) {
// find configuration
Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
- connection = HConnectionManager.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
ConnPool.put(url, connection);
}
} catch (Throwable t) {
@@ -85,13 +90,13 @@ public class HBaseConnection {
createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
}
- public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException {
+ Admin admin = conn.getAdmin();
try {
boolean tableExist = false;
try {
- hbase.getTableDescriptor(TableName.valueOf(tableName));
+ admin.getTableDescriptor(TableName.valueOf(tableName));
tableExist = true;
} catch (TableNotFoundException e) {
}
@@ -112,11 +117,11 @@ public class HBaseConnection {
desc.addFamily(fd);
}
}
- hbase.createTable(desc);
+ admin.createTable(desc);
logger.debug("HTable '" + tableName + "' created");
} finally {
- hbase.close();
+ admin.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index d1ff27a..0c06847 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -34,13 +34,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -77,7 +78,7 @@ public class HBaseResourceStore extends ResourceStore {
// final Map<String, String> tableNameMap; // path prefix ==> HBase table name
- private HConnection getConnection() throws IOException {
+ private Connection getConnection() throws IOException {
return HBaseConnection.get(hbaseUrl);
}
@@ -114,7 +115,7 @@ public class HBaseResourceStore extends ResourceStore {
ArrayList<String> result = new ArrayList<String>();
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
Scan scan = new Scan(startRow, endRow);
scan.setFilter(new KeyOnlyFilter());
try {
@@ -150,7 +151,7 @@ public class HBaseResourceStore extends ResourceStore {
scan.addColumn(B_FAMILY, B_COLUMN);
tuneScanParameters(scan);
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
List<RawResource> result = Lists.newArrayList();
try {
ResultScanner scanner = table.getScanner(scan);
@@ -219,13 +220,12 @@ public class HBaseResourceStore extends ResourceStore {
IOUtils.copy(content, bout);
bout.close();
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
table.put(put);
- table.flushCommits();
} finally {
IOUtils.closeQuietly(table);
}
@@ -233,7 +233,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -245,8 +245,6 @@ public class HBaseResourceStore extends ResourceStore {
throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
}
- table.flushCommits();
-
return newTS;
} finally {
IOUtils.closeQuietly(table);
@@ -255,11 +253,10 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
Delete del = new Delete(Bytes.toBytes(resPath));
table.delete(del);
- table.flushCommits();
} finally {
IOUtils.closeQuietly(table);
}
@@ -284,7 +281,7 @@ public class HBaseResourceStore extends ResourceStore {
scan.addColumn(B_FAMILY, B_COLUMN_TS);
}
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
ResultScanner scanner = table.getScanner(scan);
Result result = null;
@@ -303,7 +300,7 @@ public class HBaseResourceStore extends ResourceStore {
return endRow;
}
- private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
@@ -329,7 +326,7 @@ public class HBaseResourceStore extends ResourceStore {
return redirectPath;
}
- private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+ private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
if (content.length > kvSizeLimit) {
writeLargeCellToHdfs(resPath, content, table);
@@ -337,8 +334,8 @@ public class HBaseResourceStore extends ResourceStore {
}
Put put = new Put(row);
- put.add(B_FAMILY, B_COLUMN, content);
- put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+ put.addColumn(B_FAMILY, B_COLUMN, content);
+ put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
return put;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index fccd042..80f0502 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -23,19 +23,24 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,30 +61,31 @@ public class HBaseRegionSizeCalculator {
/**
* Computes size of each region for table and given column families.
* */
- public HBaseRegionSizeCalculator(HTable table) throws IOException {
- this(table, new HBaseAdmin(table.getConfiguration()));
- }
-
- /** Constructor for unit testing */
- HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
-
+ public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException {
+ Table table = null;
+ Admin admin = null;
+
try {
+ table = hbaseConnection.getTable(TableName.valueOf(tableName));
+ admin = hbaseConnection.getAdmin();
+
if (!enabled(table.getConfiguration())) {
logger.info("Region size calculation disabled.");
return;
}
- logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+ logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
// Get regions for table.
- Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
+ List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- for (HRegionInfo regionInfo : tableRegionInfos) {
- tableRegions.add(regionInfo.getRegionName());
+ for (HRegionLocation hRegionLocation : regionLocationList) {
+ tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
}
- ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+ ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
@@ -103,7 +109,8 @@ public class HBaseRegionSizeCalculator {
}
}
} finally {
- hBaseAdmin.close();
+ IOUtils.closeQuietly(table);
+ IOUtils.closeQuietly(admin);
}
}
@@ -129,7 +136,9 @@ public class HBaseRegionSizeCalculator {
return Collections.unmodifiableMap(sizeMap);
}
+
public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
return Collections.unmodifiableMap(countMap);
}
}
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
index 6d2762c..481fc6c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
@@ -21,12 +21,11 @@ package org.apache.kylin.common.util;
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -56,16 +55,14 @@ public class BasicHadoopTest {
cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
tableDesc.addFamily(cf);
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get().getAdmin();
admin.createTable(tableDesc);
admin.close();
}
@Test
public void testRetriveHtableHost() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables();
for (HTableDescriptor table : tableDescriptors) {
String value = table.getValue("KYLIN_HOST");
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index b076aa7..3852eeb 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -24,14 +24,13 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.cmd.ShellCmdOutput;
import org.apache.kylin.job.common.HiveCmdBuilder;
@@ -106,19 +105,18 @@ public class GarbageCollectionStep extends AbstractExecutable {
List<String> oldTables = getOldHTables();
if (oldTables != null && oldTables.size() > 0) {
String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = null;
+ Admin admin = null;
try {
- admin = new HBaseAdmin(conf);
+ admin = HBaseConnection.get().getAdmin();
for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ if (admin.tableExists(TableName.valueOf(table))) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
+ if (admin.isTableEnabled(TableName.valueOf(table))) {
+ admin.disableTable(TableName.valueOf(table));
}
- admin.deleteTable(table);
+ admin.deleteTable(TableName.valueOf(table));
logger.debug("Dropped HBase table " + table);
output.append("Dropped HBase table " + table + " \n");
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index d5b83ef..55d926a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -21,12 +21,16 @@ package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.SequenceFile;
@@ -38,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.constant.BatchConstants;
@@ -54,6 +59,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
public int run(String[] args) throws Exception {
Options options = new Options();
+ Connection connection = null;
+ Table table = null;
try {
options.addOption(OPTION_JOB_NAME);
@@ -90,12 +97,16 @@ public class CubeHFileJob extends AbstractHadoopJob {
attachKylinPropsAndMetadata(cube, job.getConfiguration());
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTable htable = new HTable(conf, tableName);
+ connection = HBaseConnection.get();
+ table = connection.getTable(TableName.valueOf(tableName));
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
- // Automatic config !
- HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ //Automatic config !
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
reconfigurePartitions(conf, partitionFilePath);
+
// set block replication to 3 for hfiles
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -107,6 +118,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
printUsage(options);
throw e;
} finally {
+ IOUtils.closeQuietly(table);
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 0c87fc6..70705b7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,6 +18,13 @@
package org.apache.kylin.job.hadoop.cube;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -52,13 +61,6 @@ import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
/**
* @author ysong1
*/
@@ -109,7 +111,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
// get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -143,9 +145,9 @@ public class StorageCleanupJob extends AbstractHadoopJob {
// drop tables
for (String htableName : allTablesNeedToBeDropped) {
log.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- hbaseAdmin.disableTable(htableName);
- hbaseAdmin.deleteTable(htableName);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htableName));
+ hbaseAdmin.deleteTable(TableName.valueOf(htableName));
log.info("Deleted HBase table " + htableName);
} else {
log.info("HBase table" + htableName + " does not exist");
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 027c0ca..9f5e062 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -25,11 +25,10 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
@@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get().getAdmin();
try {
if (User.isHBaseSecurityEnabled(conf)) {
@@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
byte[][] splitKeys = getSplits(conf, partitionFilePath);
- if (admin.tableExists(tableName)) {
+ if (admin.tableExists(TableName.valueOf(tableName))) {
// admin.disableTable(tableName);
// admin.deleteTable(tableName);
throw new RuntimeException("HBase table " + tableName + " exists!");
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index c032bbc..fa42148 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -19,17 +19,20 @@
package org.apache.kylin.job.hadoop.invertedindex;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
@@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
public int run(String[] args) throws Exception {
Options options = new Options();
+ Connection connection = null;
+ Table table = null;
try {
options.addOption(OPTION_JOB_NAME);
@@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob {
job.setMapOutputValueClass(KeyValue.class);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
- HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ connection = HBaseConnection.get();
+ table = connection.getTable(TableName.valueOf(tableName));
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
this.deletePath(job.getConfiguration(), output);
@@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
} catch (Exception e) {
printUsage(options);
throw e;
+ } finally {
+ IOUtils.closeQuietly(table);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 32d065a..63777ef 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
@@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob {
DeployCoprocessorCLI.deployCoprocessor(tableDesc);
// drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ Admin admin = HBaseConnection.get().getAdmin();
+ if (admin.tableExists(TableName.valueOf(tableName))) {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
}
// create table
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
index b6e5af5..7fc1d72 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
@@ -21,11 +21,10 @@ package org.apache.kylin.job.tools;
import java.io.IOException;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob {
}
private void clean() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
String name = descriptor.getNameAsString().toLowerCase();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index 44bc2c3..4c25d8b 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -18,15 +18,31 @@
package org.apache.kylin.job.tools;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hdfs.web.JsonUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.*;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -47,11 +63,6 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
/**
* Created by honma on 9/3/14.
* <p/>
@@ -71,7 +82,7 @@ public class CubeMigrationCLI {
private static ResourceStore srcStore;
private static ResourceStore dstStore;
private static FileSystem hdfsFS;
- private static HBaseAdmin hbaseAdmin;
+ private static Admin hbaseAdmin;
public static final String ACL_INFO_FAMILY = "i";
private static final String ACL_TABLE_NAME = "_acl";
@@ -117,8 +128,7 @@ public class CubeMigrationCLI {
checkAndGetHbaseUrl();
- Configuration conf = HBaseConfiguration.create();
- hbaseAdmin = new HBaseAdmin(conf);
+ hbaseAdmin = HBaseConnection.get().getAdmin();
hdfsFS = FileSystem.get(new Configuration());
@@ -141,6 +151,8 @@ public class CubeMigrationCLI {
} else {
showOpts();
}
+
+ IOUtils.closeQuietly(hbaseAdmin);
}
public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
@@ -304,10 +316,10 @@ public class CubeMigrationCLI {
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
logger.info("CHANGE_HTABLE_HOST is completed");
break;
}
@@ -418,14 +430,14 @@ public class CubeMigrationCLI {
Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
String projUUID = project.getUuid();
- HTableInterface srcAclHtable = null;
- HTableInterface destAclHtable = null;
+ Table srcAclHtable = null;
+ Table destAclHtable = null;
try {
- srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix() + "_acl");
- destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
+ srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + "_acl"));
+ destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl"));
// cube acl
- Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
+ Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
if (result.listCells() != null) {
for (Cell cell : result.listCells()) {
byte[] family = CellUtil.cloneFamily(cell);
@@ -438,11 +450,10 @@ public class CubeMigrationCLI {
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(cubeId));
- put.add(family, column, value);
+ put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
destAclHtable.put(put);
}
}
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(srcAclHtable);
IOUtils.closeQuietly(destAclHtable);
@@ -469,10 +480,10 @@ public class CubeMigrationCLI {
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
break;
}
case COPY_FILE_IN_META: {
@@ -502,13 +513,11 @@ public class CubeMigrationCLI {
case COPY_ACL: {
String cubeId = (String) opt.params[0];
String modelId = (String) opt.params[1];
- HTableInterface destAclHtable = null;
+ Table destAclHtable = null;
try {
- destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
-
+ destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl"));
destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
- destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(destAclHtable);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index bf655a0..28f52f2 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -1,384 +1,385 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
- public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
- public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
- public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
- public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
- public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
- private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
- public static void main(String[] args) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
- String localCoprocessorJar = new File(args[0]).getAbsolutePath();
- logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
- List<String> tableNames = getHTableNames(kylinConfig);
- logger.info("Identify tables " + tableNames);
-
- if (args.length <= 1) {
- printUsageAndExit();
- }
-
- String filterType = args[1].toLowerCase();
- if (filterType.equals("-table")) {
- tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (filterType.equals("-cube")) {
- tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
- } else if (!filterType.equals("all")) {
- printUsageAndExit();
- }
-
- logger.info("Will execute tables " + tableNames);
-
- Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
- logger.info("Old coprocessor jar: " + oldJarPaths);
-
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
- logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
- // Don't remove old jars, missing coprocessor jar will fail hbase
- // removeOldJars(oldJarPaths, fileSystem);
-
- hbaseAdmin.close();
-
- logger.info("Processed " + processedTables);
- logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
- }
-
- private static void printUsageAndExit() {
- logger.warn("Probe run, exiting.");
- logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2");
- System.exit(0);
- }
-
- private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- List<String> result = Lists.newArrayList();
- for (String c : cubeNames) {
- c = c.trim();
- if (c.endsWith(","))
- c = c.substring(0, c.length() - 1);
-
- CubeInstance cubeInstance = cubeManager.getCube(c);
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String tableName = segment.getStorageLocationIdentifier();
- if (allTableNames.contains(tableName)) {
- result.add(tableName);
- }
- }
- }
- return result;
- }
-
- private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
- List<String> result = Lists.newArrayList();
- for (String t : tableNames) {
- t = t.trim();
- if (t.endsWith(","))
- t = t.substring(0, t.length() - 1);
-
- if (allTableNames.contains(t)) {
- result.add(t);
- }
- }
- return result;
- }
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
- desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
- }
-
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
-
- logger.info("Unset coprocessor on " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-
- // remove coprocessors of 1.x version
- while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
- desc.removeCoprocessor(OBSERVER_CLS_NAME);
- }
- while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
- desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
- }
- // remove coprocessors of 2.x version
- while (desc.hasCoprocessor(CubeObserverClassV2)) {
- desc.removeCoprocessor(CubeObserverClassV2);
- }
- while (desc.hasCoprocessor(CubeEndpointClassV2)) {
- desc.removeCoprocessor(CubeEndpointClassV2);
- }
- while (desc.hasCoprocessor(IIEndpointClassV2)) {
- desc.removeCoprocessor(IIEndpointClassV2);
- }
-
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- hbaseAdmin.modifyTable(tableName, desc);
-
- logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
- }
-
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
- try {
- resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
- logger.error("Error processing " + tableName, ex);
- }
- }
- return processed;
- }
-
- public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
- FileStatus newestJar = null;
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getPath().toString().endsWith(".jar")) {
- if (newestJar == null) {
- newestJar = fileStatus;
- } else {
- if (newestJar.getModificationTime() < fileStatus.getModificationTime())
- newestJar = fileStatus;
- }
- }
- }
- if (newestJar == null)
- return null;
-
- Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
- logger.info("The newest coprocessor is " + path.toString());
- return path;
- }
-
- public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
- HashSet<String> result = new HashSet<String>();
-
- for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
- try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- } catch (TableNotFoundException e) {
- logger.warn("Table not found " + tableName, e);
- continue;
- }
-
- Matcher keyMatcher;
- Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
- keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
- if (!keyMatcher.matches()) {
- continue;
- }
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
-
- String jarPath = valueMatcher.group(1).trim();
- String clsName = valueMatcher.group(2).trim();
-
- if (OBSERVER_CLS_NAME.equals(clsName)) {
- result.add(jarPath);
- }
- }
- }
-
- return result;
- }
-
- private static List<String> getHTableNames(KylinConfig config) {
- CubeManager cubeMgr = CubeManager.getInstance(config);
-
- ArrayList<String> result = new ArrayList<String>();
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
- for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- return result;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+ public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
+ public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
+ public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+ public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+ public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+ private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+ public static void main(String[] args) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
+
+ String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+ logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+ List<String> tableNames = getHTableNames(kylinConfig);
+ logger.info("Identify tables " + tableNames);
+
+ if (args.length <= 1) {
+ printUsageAndExit();
+ }
+
+ String filterType = args[1].toLowerCase();
+ if (filterType.equals("-table")) {
+ tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
+ } else if (filterType.equals("-cube")) {
+ tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
+ } else if (!filterType.equals("all")) {
+ printUsageAndExit();
+ }
+
+ logger.info("Will execute tables " + tableNames);
+
+ Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+ logger.info("Old coprocessor jar: " + oldJarPaths);
+
+ Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+ logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+ List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+ // Don't remove old jars, missing coprocessor jar will fail hbase
+ // removeOldJars(oldJarPaths, fileSystem);
+
+ hbaseAdmin.close();
+
+ logger.info("Processed " + processedTables);
+ logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+ }
+
+ private static void printUsageAndExit() {
+ logger.warn("Probe run, exiting.");
+ logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2");
+ System.exit(0);
+ }
+
+ private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ List<String> result = Lists.newArrayList();
+ for (String c : cubeNames) {
+ c = c.trim();
+ if (c.endsWith(","))
+ c = c.substring(0, c.length() - 1);
+
+ CubeInstance cubeInstance = cubeManager.getCube(c);
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String tableName = segment.getStorageLocationIdentifier();
+ if (allTableNames.contains(tableName)) {
+ result.add(tableName);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
+ List<String> result = Lists.newArrayList();
+ for (String t : tableNames) {
+ t = t.trim();
+ if (t.endsWith(","))
+ t = t.substring(0, t.length() - 1);
+
+ if (allTableNames.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ try {
+ initHTableCoprocessor(tableDesc);
+ logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+ logger.error("Will try creating the table without coprocessor.");
+ }
+ }
+
+ private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+ desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+ }
+
+ public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Disable " + tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
+
+ logger.info("Unset coprocessor on " + tableName);
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+
+ // remove coprocessors of 1.x version
+ while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+ desc.removeCoprocessor(OBSERVER_CLS_NAME);
+ }
+ while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+ desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+ }
+ // remove coprocessors of 2.x version
+ while (desc.hasCoprocessor(CubeObserverClassV2)) {
+ desc.removeCoprocessor(CubeObserverClassV2);
+ }
+ while (desc.hasCoprocessor(CubeEndpointClassV2)) {
+ desc.removeCoprocessor(CubeEndpointClassV2);
+ }
+ while (desc.hasCoprocessor(IIEndpointClassV2)) {
+ desc.removeCoprocessor(IIEndpointClassV2);
+ }
+
+ addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+
+ logger.info("Enable " + tableName);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
+ }
+
+ private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ List<String> processed = new ArrayList<String>();
+
+ for (String tableName : tableNames) {
+ try {
+ resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+ processed.add(tableName);
+ } catch (IOException ex) {
+ logger.error("Error processing " + tableName, ex);
+ }
+ }
+ return processed;
+ }
+
+ public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+ FileStatus newestJar = null;
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getPath().toString().endsWith(".jar")) {
+ if (newestJar == null) {
+ newestJar = fileStatus;
+ } else {
+ if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+ newestJar = fileStatus;
+ }
+ }
+ }
+ if (newestJar == null)
+ return null;
+
+ Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+ logger.info("The newest coprocessor is " + path.toString());
+ return path;
+ }
+
+ public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+ Path uploadPath = null;
+ File localCoprocessorFile = new File(localCoprocessorJar);
+
+ // check existing jars
+ if (oldJarPaths == null) {
+ oldJarPaths = new HashSet<String>();
+ }
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
+ uploadPath = fileStatus.getPath();
+ break;
+ }
+ String filename = fileStatus.getPath().toString();
+ if (filename.endsWith(".jar")) {
+ oldJarPaths.add(filename);
+ }
+ }
+
+ // upload if not existing
+ if (uploadPath == null) {
+ // figure out a unique new jar file name
+ Set<String> oldJarNames = new HashSet<String>();
+ for (String path : oldJarPaths) {
+ oldJarNames.add(new Path(path).getName());
+ }
+ String baseName = getBaseFileName(localCoprocessorJar);
+ String newName = null;
+ int i = 0;
+ while (newName == null) {
+ newName = baseName + "-" + (i++) + ".jar";
+ if (oldJarNames.contains(newName))
+ newName = null;
+ }
+
+ // upload
+ uploadPath = new Path(coprocessorDir, newName);
+ FileInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ in = new FileInputStream(localCoprocessorFile);
+ out = fileSystem.create(uploadPath);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+
+ fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+ }
+
+ uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+ return uploadPath;
+ }
+
+ private static String getBaseFileName(String localCoprocessorJar) {
+ File localJar = new File(localCoprocessorJar);
+ String baseName = localJar.getName();
+ if (baseName.endsWith(".jar"))
+ baseName = baseName.substring(0, baseName.length() - ".jar".length());
+ return baseName;
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+ fileSystem.mkdirs(coprocessorDir);
+ return coprocessorDir;
+ }
+
+ private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
+ HashSet<String> result = new HashSet<String>();
+
+ for (String tableName : tableNames) {
+ HTableDescriptor tableDescriptor = null;
+ try {
+ tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ } catch (TableNotFoundException e) {
+ logger.warn("Table not found " + tableName, e);
+ continue;
+ }
+
+ Matcher keyMatcher;
+ Matcher valueMatcher;
+ for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+ keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+ if (!keyMatcher.matches()) {
+ continue;
+ }
+ valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+ if (!valueMatcher.matches()) {
+ continue;
+ }
+
+ String jarPath = valueMatcher.group(1).trim();
+ String clsName = valueMatcher.group(2).trim();
+
+ if (OBSERVER_CLS_NAME.equals(clsName)) {
+ result.add(jarPath);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<String> getHTableNames(KylinConfig config) {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ ArrayList<String> result = new ArrayList<String>();
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+ for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
index 70e1df6..5fe5e58 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
@@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark {
public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
- HConnection conn = HBaseConnection.get(hbaseUrl);
+ Connection conn = HBaseConnection.get(hbaseUrl);
createHTableIfNeeded(conn, TEST_TABLE);
prepareData(conn);
@@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark {
}
- private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+ private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
Stats stats = new Stats("COLUMN_SCAN");
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark {
}
}
- private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
}
- private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
}
- private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
}
- private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark {
}
}
- private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+ private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark {
}
}
- private static void prepareData(HConnection conn) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void prepareData(Connection conn) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
// check how many rows existing
@@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark {
byte[] rowkey = Bytes.toBytes(i);
Put put = new Put(rowkey);
byte[] cell = randomBytes();
- put.add(CF, QN, cell);
+ put.addColumn(CF, QN, cell);
table.put(put);
nBytes += cell.length;
dot(i, N_ROWS);
@@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark {
return bytes;
}
- private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
boolean tableExist = false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
index 53930e3..e283748 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
@@ -23,12 +23,11 @@ import java.io.IOException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob {
}
private void alter() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
hbaseAdmin.disableTable(table.getTableName());