You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/07/26 03:34:06 UTC
[47/55] [abbrv] kylin git commit: KYLIN-1528 Create a branch for v1.5
with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b6ed089
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b6ed089
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b6ed089
Branch: refs/heads/1.5.x-CDH5.7
Commit: 1b6ed0899cc2efb575b24e98ddd6f31922979883
Parents: 76d3bd2
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 23 17:07:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jul 25 20:13:38 2016 +0800
----------------------------------------------------------------------
examples/test_case_data/sandbox/hbase-site.xml | 19 +-
.../test_case_data/sandbox/kylin_job_conf.xml | 86 +++--
examples/test_case_data/sandbox/mapred-site.xml | 23 +-
.../kylin/provision/BuildCubeWithEngine.java | 50 +--
pom.xml | 18 +-
.../kylin/rest/security/AclHBaseStorage.java | 4 +-
.../rest/security/MockAclHBaseStorage.java | 3 +-
.../apache/kylin/rest/security/MockHTable.java | 95 +----
.../rest/security/RealAclHBaseStorage.java | 9 +-
.../apache/kylin/rest/service/AclService.java | 25 +-
.../apache/kylin/rest/service/CubeService.java | 35 +-
.../apache/kylin/rest/service/QueryService.java | 23 +-
.../apache/kylin/rest/service/UserService.java | 23 +-
.../kylin/storage/hbase/HBaseConnection.java | 169 +++------
.../kylin/storage/hbase/HBaseResourceStore.java | 37 +-
.../storage/hbase/cube/SimpleHBaseStore.java | 20 +-
.../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 4 +-
.../hbase/cube/v1/RegionScannerAdapter.java | 11 +-
.../cube/v1/SerializedHBaseTupleIterator.java | 4 +-
.../observer/AggregateRegionObserver.java | 4 +-
.../observer/AggregationScanner.java | 15 +-
.../observer/ObserverAggregationCache.java | 15 +-
.../coprocessor/observer/ObserverEnabler.java | 4 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 10 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +-
.../coprocessor/endpoint/CubeVisitService.java | 4 +-
.../storage/hbase/steps/CubeHTableUtil.java | 16 +-
.../storage/hbase/steps/DeprecatedGCStep.java | 23 +-
.../storage/hbase/steps/HBaseCuboidWriter.java | 9 +-
.../hbase/steps/HBaseStreamingOutput.java | 9 +-
.../kylin/storage/hbase/steps/MergeGCStep.java | 23 +-
.../storage/hbase/util/CleanHtableCLI.java | 12 +-
.../storage/hbase/util/CubeMigrationCLI.java | 353 ++++++++++---------
.../hbase/util/CubeMigrationCheckCLI.java | 19 +-
.../hbase/util/DeployCoprocessorCLI.java | 23 +-
.../hbase/util/ExtendCubeToHybridCLI.java | 8 +-
.../hbase/util/GridTableHBaseBenchmark.java | 34 +-
.../kylin/storage/hbase/util/HBaseClean.java | 18 +-
.../hbase/util/HBaseRegionSizeCalculator.java | 42 +--
.../kylin/storage/hbase/util/HBaseUsage.java | 10 +-
.../storage/hbase/util/HbaseStreamingInput.java | 31 +-
.../hbase/util/HtableAlterMetadataCLI.java | 9 +-
.../storage/hbase/util/OrphanHBaseCleanJob.java | 18 +-
.../kylin/storage/hbase/util/PingHBaseCLI.java | 15 +-
.../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-
.../storage/hbase/util/StorageCleanupJob.java | 16 +-
.../storage/hbase/util/UpdateHTableHostCLI.java | 17 +-
.../observer/AggregateRegionObserverTest.java | 31 +-
.../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +-
50 files changed, 681 insertions(+), 801 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/examples/test_case_data/sandbox/hbase-site.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml
index 46d5345..734908e 100644
--- a/examples/test_case_data/sandbox/hbase-site.xml
+++ b/examples/test_case_data/sandbox/hbase-site.xml
@@ -190,22 +190,5 @@
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
- <property>
- <name>hbase.client.pause</name>
- <value>100</value>
- <description>General client pause value. Used mostly as value to wait
- before running a retry of a failed get, region lookup, etc.
- See hbase.client.retries.number for description of how we backoff from
- this initial pause amount and how this pause works w/ retries.</description>
- </property>
- <property>
- <name>hbase.client.retries.number</name>
- <value>5</value>
- <description>Maximum retries. Used as maximum for all retryable
- operations such as the getting of a cell's value, starting a row update,
- etc. Retry interval is a rough function based on hbase.client.pause. At
- first we retry at this interval but then with backoff, we pretty quickly reach
- retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup
- ramps up. Change this setting and hbase.client.pause to suit your workload.</description>
- </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/examples/test_case_data/sandbox/kylin_job_conf.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml
index bd947af..6082fa9 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf.xml
@@ -1,20 +1,18 @@
<?xml version="1.0"?>
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. See accompanying LICENSE file.
-->
+
<configuration>
<property>
@@ -26,44 +24,41 @@
</description>
</property>
- <property>
- <name>mapreduce.map.maxattempts</name>
- <value>2</value>
- </property>
+ <!-- uncomment the following 5 properties to enable lzo compressing
- <!--
- <property>
- <name>mapred.compress.map.output</name>
- <value>true</value>
- <description>Compress map outputs</description>
- </property>
+ <property>
+ <name>mapred.compress.map.output</name>
+ <value>true</value>
+ <description>Compress map outputs</description>
+ </property>
- <property>
- <name>mapred.map.output.compression.codec</name>
- <value>org.apache.hadoop.io.compress.SnappyCodec</value>
- <description>The compression codec to use for map outputs
- </description>
- </property>
+ <property>
+ <name>mapred.map.output.compression.codec</name>
+ <value>com.hadoop.compression.lzo.LzoCodec</value>
+ <description>The compression codec to use for map outputs
+ </description>
+ </property>
- <property>
- <name>mapred.output.compress</name>
- <value>true</value>
- <description>Compress the output of a MapReduce job</description>
- </property>
+ <property>
+ <name>mapred.output.compress</name>
+ <value>true</value>
+ <description>Compress the output of a MapReduce job</description>
+ </property>
- <property>
- <name>mapred.output.compression.codec</name>
- <value>org.apache.hadoop.io.compress.SnappyCodec</value>
- <description>The compression codec to use for job outputs
- </description>
- </property>
+ <property>
+ <name>mapred.output.compression.codec</name>
+ <value>com.hadoop.compression.lzo.LzoCodec</value>
+ <description>The compression codec to use for job outputs
+ </description>
+ </property>
- <property>
- <name>mapred.output.compression.type</name>
- <value>BLOCK</value>
- <description>The compression type to use for job outputs</description>
- </property>
--->
+ <property>
+ <name>mapred.output.compression.type</name>
+ <value>BLOCK</value>
+ <description>The compression type to use for job outputs</description>
+ </property>
+
+ !-->
<property>
<name>mapreduce.job.max.split.locations</name>
@@ -76,5 +71,4 @@
<value>2</value>
<description>Block replication</description>
</property>
-
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/examples/test_case_data/sandbox/mapred-site.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml
index 18f6feb..ff1c7eb 100644
--- a/examples/test_case_data/sandbox/mapred-site.xml
+++ b/examples/test_case_data/sandbox/mapred-site.xml
@@ -18,7 +18,7 @@
<property>
<name>io.sort.mb</name>
- <value>128</value>
+ <value>64</value>
</property>
<property>
@@ -28,12 +28,12 @@
<property>
<name>mapred.job.map.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
<name>mapred.job.reduce.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -58,7 +58,7 @@
<property>
<name>mapreduce.application.classpath</name>
- <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/usr/hdp/${hdp.version}/hadoop/lib/snappy-java-1.0.4.1.jar:/etc/hadoop/conf/secure</value>
+ <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure</value>
</property>
<property>
@@ -81,10 +81,9 @@
<value>false</value>
</property>
- <!--the default value on hdp is 0.05, however for test environments we need to be conservative on resource -->
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
- <value>1</value>
+ <value>0.05</value>
</property>
<property>
@@ -114,7 +113,7 @@
<property>
<name>mapreduce.map.java.opts</name>
- <value>-Xmx512m</value>
+ <value>-Xmx200m</value>
</property>
<property>
@@ -124,7 +123,7 @@
<property>
<name>mapreduce.map.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -169,7 +168,7 @@
<property>
<name>mapreduce.reduce.memory.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
@@ -219,7 +218,7 @@
<property>
<name>mapreduce.task.io.sort.mb</name>
- <value>128</value>
+ <value>64</value>
</property>
<property>
@@ -234,7 +233,7 @@
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
- <value>-Xmx512m</value>
+ <value>-Xmx200m</value>
</property>
<property>
@@ -244,7 +243,7 @@
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
- <value>512</value>
+ <value>250</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index a37d7b8..4cee1ed 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -421,32 +421,32 @@ public class BuildCubeWithEngine {
private void checkHFilesInHBase(CubeSegment segment) throws IOException {
Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
String tableName = segment.getStorageLocationIdentifier();
- try (HTable table = new HTable(conf, tableName)) {
- HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
- Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
- long totalSize = 0;
- for (Long size : sizeMap.values()) {
- totalSize += size;
- }
- if (totalSize == 0) {
- return;
- }
- Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
- // check if there's region contains more than one hfile, which means the hfile config take effects
- boolean hasMultiHFileRegions = false;
- for (Pair<Integer, Integer> count : countMap.values()) {
- // check if hfile count is greater than store count
- if (count.getSecond() > count.getFirst()) {
- hasMultiHFileRegions = true;
- break;
- }
- }
- if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
- throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
- } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
- throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+ long totalSize = 0;
+ for (Long size : sizeMap.values()) {
+ totalSize += size;
+ }
+ if (totalSize == 0) {
+ return;
+ }
+
+ Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+ // check if there's region contains more than one hfile, which means the hfile config take effects
+ boolean hasMultiHFileRegions = false;
+ for (Pair<Integer, Integer> count : countMap.values()) {
+ // check if hfile count is greater than store count
+ if (count.getSecond() > count.getFirst()) {
+ hasMultiHFileRegions = true;
+ break;
}
}
+ if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+ throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+ } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+ throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0715e87..025e31a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,12 +47,12 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Hadoop versions -->
- <hadoop2.version>2.6.0</hadoop2.version>
- <yarn.version>2.6.0</yarn.version>
+ <hadoop2.version>2.7.1</hadoop2.version>
+ <yarn.version>2.7.1</yarn.version>
<zookeeper.version>3.4.6</zookeeper.version>
- <hive.version>0.14.0</hive.version>
- <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
- <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+ <hive.version>1.2.1</hive.version>
+ <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+ <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
<kafka.version>0.8.1</kafka.version>
<!-- Dependency versions -->
@@ -64,6 +64,7 @@
<!-- Commons -->
<commons-cli.version>1.2</commons-cli.version>
+ <commons-codec.version>1.4</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.4</commons-lang3.version>
<commons-collections.version>3.2.1</commons-collections.version>
@@ -102,7 +103,7 @@
<calcite.version>1.6.0</calcite.version>
<!-- Curator.version Version -->
- <curator.version>2.6.0</curator.version>
+ <curator.version>2.7.1</curator.version>
<!-- Sonar -->
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -323,6 +324,11 @@
<version>${commons-cli.version}</version>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
index 38f299e..bfb5fe4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java
@@ -20,7 +20,7 @@ package org.apache.kylin.rest.security;
import java.io.IOException;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
/**
*/
@@ -37,6 +37,6 @@ public interface AclHBaseStorage {
String prepareHBaseTable(Class<?> clazz) throws IOException;
- HTableInterface getTable(String tableName) throws IOException;
+ Table getTable(String tableName) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index 6c8081d..492c176 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.kylin.common.KylinConfig;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.UserService;
@@ -64,7 +65,7 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
}
@Override
- public HTableInterface getTable(String tableName) throws IOException {
+ public Table getTable(String tableName) throws IOException {
if (realAcl != null) {
return realAcl.getTable(tableName);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
index d0aa0ed..972eea9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -91,7 +91,7 @@ import com.google.protobuf.ServiceException;
* <li>remove some methods for loading data, checking values ...</li>
* </ul>
*/
-public class MockHTable implements HTableInterface {
+public class MockHTable implements Table {
private final String tableName;
private final List<String> columnFamilies = new ArrayList<>();
@@ -114,14 +114,6 @@ public class MockHTable implements HTableInterface {
this.columnFamilies.add(columnFamily);
}
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getTableName() {
- return tableName.getBytes();
- }
-
@Override
public TableName getName() {
return null;
@@ -200,8 +192,8 @@ public class MockHTable implements HTableInterface {
}
@Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- return new Boolean[0];
+ public boolean[] existsAll(List<Get> list) throws IOException {
+ return new boolean[0];
}
/**
@@ -306,15 +298,6 @@ public class MockHTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- // FIXME: implement
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public ResultScanner getScanner(Scan scan) throws IOException {
final List<Result> ret = new ArrayList<Result>();
byte[] st = scan.getStartRow();
@@ -446,7 +429,7 @@ public class MockHTable implements HTableInterface {
*/
}
if (filter.hasFilterRow() && !filteredOnRowKey) {
- filter.filterRow(nkvs);
+ filter.filterRow();
}
if (filter.filterRow() || filteredOnRowKey) {
nkvs.clear();
@@ -535,6 +518,11 @@ public class MockHTable implements HTableInterface {
return false;
}
+ @Override
+ public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
+ return false;
+ }
+
/**
* {@inheritDoc}
*/
@@ -555,7 +543,7 @@ public class MockHTable implements HTableInterface {
continue;
}
for (KeyValue kv : delete.getFamilyMap().get(family)) {
- if (kv.isDeleteFamily()) {
+ if (kv.isDelete()) {
data.get(row).get(kv.getFamily()).clear();
} else {
data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
@@ -592,6 +580,11 @@ public class MockHTable implements HTableInterface {
return false;
}
+ @Override
+ public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
+ return false;
+ }
+
/**
* {@inheritDoc}
*/
@@ -605,7 +598,7 @@ public class MockHTable implements HTableInterface {
*/
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
- return incrementColumnValue(row, family, qualifier, amount, true);
+ return incrementColumnValue(row, family, qualifier, amount, null);
}
@Override
@@ -617,37 +610,6 @@ public class MockHTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
- if (check(row, family, qualifier, null)) {
- Put put = new Put(row);
- put.add(family, qualifier, Bytes.toBytes(amount));
- put(put);
- return amount;
- }
- long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
- data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
- return newValue;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isAutoFlush() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void flushCommits() throws IOException {
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public void close() throws IOException {
}
@@ -673,29 +635,6 @@ public class MockHTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void setAutoFlush(boolean autoFlush) {
- throw new NotImplementedException();
-
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- throw new NotImplementedException();
-
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- throw new NotImplementedException();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public long getWriteBufferSize() {
throw new NotImplementedException();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
index 27d9720..222eb70 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java
@@ -21,7 +21,8 @@ package org.apache.kylin.rest.security;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.UserService;
@@ -56,11 +57,11 @@ public class RealAclHBaseStorage implements AclHBaseStorage {
}
@Override
- public HTableInterface getTable(String tableName) throws IOException {
+ public Table getTable(String tableName) throws IOException {
if (StringUtils.equals(tableName, aclTableName)) {
- return HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
} else if (StringUtils.equals(tableName, userTableName)) {
- return HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
} else {
throw new IllegalStateException("getTable failed" + tableName);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
index d693a67..3e3efec 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -33,7 +33,7 @@ import javax.annotation.PostConstruct;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -124,7 +124,7 @@ public class AclService implements MutableAclService {
@Override
public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -173,7 +173,7 @@ public class AclService implements MutableAclService {
@Override
public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
- HTableInterface htable = null;
+ Table htable = null;
Result result = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -226,17 +226,16 @@ public class AclService implements MutableAclService {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
PrincipalSid sid = new PrincipalSid(auth);
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " created successfully.");
} catch (IOException e) {
@@ -250,7 +249,7 @@ public class AclService implements MutableAclService {
@Override
public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -266,7 +265,6 @@ public class AclService implements MutableAclService {
}
htable.delete(delete);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " deleted successfully.");
} catch (IOException e) {
@@ -284,7 +282,7 @@ public class AclService implements MutableAclService {
throw e;
}
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(aclTableName);
@@ -295,17 +293,16 @@ public class AclService implements MutableAclService {
Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
if (null != acl.getParentAcl()) {
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
}
for (AccessControlEntry ace : acl.getEntries()) {
AceInfo aceInfo = new AceInfo(ace);
- put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
}
if (!put.isEmpty()) {
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index cfb4cf8..00b07d5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -30,6 +30,7 @@ import java.util.WeakHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
+import java.util.*;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
@@ -64,7 +65,6 @@ import org.apache.kylin.rest.security.AclPermission;
import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -444,35 +444,24 @@ public class CubeService extends BasicService {
if (htableInfoCache.containsKey(tableName)) {
return htableInfoCache.get(tableName);
}
-
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- HTable table = null;
+ Connection conn = HBaseConnection.get(this.getConfig().getStorageUrl());
HBaseResponse hr = null;
long tableSize = 0;
int regionCount = 0;
- try {
- table = new HTable(hconf, tableName);
-
- HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
- Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
-
- for (long s : sizeMap.values()) {
- tableSize += s;
- }
-
- regionCount = sizeMap.size();
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
- // Set response.
- hr = new HBaseResponse();
- hr.setTableSize(tableSize);
- hr.setRegionCount(regionCount);
- } finally {
- if (null != table) {
- table.close();
- }
+ for (long s : sizeMap.values()) {
+ tableSize += s;
}
+ regionCount = sizeMap.size();
+
+ // Set response.
+ hr = new HBaseResponse();
+ hr.setTableSize(tableSize);
+ hr.setRegionCount(regionCount);
htableInfoCache.put(tableName, hr);
return hr;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 84a5c67..783616d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -44,9 +44,10 @@ import javax.sql.DataSource;
import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
@@ -131,14 +132,13 @@ public class QueryService extends BasicService {
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -164,14 +164,13 @@ public class QueryService extends BasicService {
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -183,12 +182,16 @@ public class QueryService extends BasicService {
}
List<Query> queries = new ArrayList<Query>();
- HTableInterface htable = null;
+ Table htable = null;
try {
+<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
HConnection conn = HBaseConnection.get(hbaseUrl);
HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);
htable = conn.getTable(userTableName);
+=======
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
+>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/QueryService.java
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
index 07c7c6f..64c2c7d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -30,7 +30,7 @@ import javax.annotation.PostConstruct;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -72,7 +72,7 @@ public class UserService implements UserDetailsManager {
@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
@@ -144,16 +144,21 @@ public class UserService implements UserDetailsManager {
@Override
public void updateUser(UserDetails user) {
- HTableInterface htable = null;
+ Table htable = null;
try {
+ byte[] userAuthorities = serialize(user.getAuthorities());
htable = aclHBaseStorage.getTable(userTableName);
+<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
Pair<byte[], byte[]> pair = userToHBaseRow(user);
Put put = new Put(pair.getKey());
put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
+=======
+ Put put = new Put(Bytes.toBytes(user.getUsername()));
+ put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities);
+>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java
htable.put(put);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -163,14 +168,13 @@ public class UserService implements UserDetailsManager {
@Override
public void deleteUser(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
Delete delete = new Delete(Bytes.toBytes(username));
htable.delete(delete);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -185,7 +189,7 @@ public class UserService implements UserDetailsManager {
@Override
public boolean userExists(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
@@ -215,8 +219,13 @@ public class UserService implements UserDetailsManager {
Scan s = new Scan();
s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));
+<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
List<UserDetails> all = new ArrayList<UserDetails>();
HTableInterface htable = null;
+=======
+ List<String> authorities = new ArrayList<String>();
+ Table htable = null;
+>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java
ResultScanner scanner = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index d4dd3ae..05170a0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -19,16 +19,9 @@
package org.apache.kylin.storage.hbase;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -39,18 +32,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
/**
* @author yangli9
*
@@ -61,19 +51,14 @@ public class HBaseConnection {
private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
- private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>();
- private static final Map<String, HConnection> connPool = new ConcurrentHashMap<String, HConnection>();
- private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>();
-
- private static ExecutorService coprocessorPool = null;
+ private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
+ private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- closeCoprocessorPool();
-
- for (HConnection conn : connPool.values()) {
+ for (Connection conn : ConnPool.values()) {
try {
conn.close();
} catch (IOException e) {
@@ -83,64 +68,24 @@ public class HBaseConnection {
}
});
}
-
- public static ExecutorService getCoprocessorPool() {
- if (coprocessorPool != null) {
- return coprocessorPool;
- }
-
- synchronized (HBaseConnection.class) {
- if (coprocessorPool != null) {
- return coprocessorPool;
- }
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- // copy from HConnectionImplementation.getBatchPool()
- int maxThreads = config.getHBaseMaxConnectionThreads();
- int coreThreads = config.getHBaseCoreConnectionThreads();
- long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
- LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
- Threads.newDaemonThreadFactory("kylin-coproc-"));
- tpe.allowCoreThreadTimeOut(true);
-
- logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);
-
- coprocessorPool = tpe;
- return coprocessorPool;
- }
- }
-
- private static void closeCoprocessorPool() {
- if (coprocessorPool == null)
- return;
-
- coprocessorPool.shutdown();
- try {
- if (!coprocessorPool.awaitTermination(10, TimeUnit.SECONDS)) {
- coprocessorPool.shutdownNow();
- }
- } catch (InterruptedException e) {
- coprocessorPool.shutdownNow();
- }
- }
-
+
public static void clearConnCache() {
- connPool.clear();
+ ConnPool.clear();
}
+ private static final ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
public static Configuration getCurrentHBaseConfiguration() {
- if (configThreadLocal.get() == null) {
+ if (hbaseConfig.get() == null) {
String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
- configThreadLocal.set(newHBaseConfiguration(storageUrl));
+ hbaseConfig.set(newHBaseConfiguration(storageUrl));
}
- return configThreadLocal.get();
+ return hbaseConfig.get();
}
private static Configuration newHBaseConfiguration(String url) {
Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
-
+
// using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath
if (!(StringUtils.isEmpty(url) || "hbase".equals(url)))
throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties");
@@ -150,7 +95,7 @@ public class HBaseConnection {
if (StringUtils.isNotEmpty(hbaseClusterFs)) {
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
}
-
+
// https://issues.apache.org/jira/browse/KYLIN-953
if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
conf.set("hadoop.tmp.dir", "/tmp");
@@ -167,7 +112,7 @@ public class HBaseConnection {
return conf;
}
-
+
public static String makeQualifiedPathInHBaseCluster(String path) {
try {
FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
@@ -178,25 +123,25 @@ public class HBaseConnection {
}
// ============================================================================
-
- // returned HConnection can be shared by multiple threads and does not require close()
+
+ // returned Connection can be shared by multiple threads and does not require close()
@SuppressWarnings("resource")
- public static HConnection get(String url) {
+ public static Connection get(String url) {
// find configuration
- Configuration conf = configCache.get(url);
+ Configuration conf = ConfigCache.get(url);
if (conf == null) {
conf = newHBaseConfiguration(url);
- configCache.put(url, conf);
+ ConfigCache.put(url, conf);
}
- HConnection connection = connPool.get(url);
+ Connection connection = ConnPool.get(url);
try {
while (true) {
// I don't use DCL since recreate a connection is not a big issue.
if (connection == null || connection.isClosed()) {
logger.info("connection is null or closed, creating a new one");
- connection = HConnectionManager.createConnection(conf);
- connPool.put(url, connection);
+ connection = ConnectionFactory.createConnection(conf);
+ ConnPool.put(url, connection);
}
if (connection == null || connection.isClosed()) {
@@ -214,8 +159,8 @@ public class HBaseConnection {
return connection;
}
- public static boolean tableExists(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static boolean tableExists(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
return hbase.tableExists(TableName.valueOf(tableName));
} finally {
@@ -235,39 +180,23 @@ public class HBaseConnection {
deleteTable(HBaseConnection.get(hbaseUrl), tableName);
}
- public static void createHTableIfNeeded(HConnection conn, String table, String... families) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
- if (tableExists(conn, table)) {
- logger.debug("HTable '" + table + "' already exists");
- Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(TableName.valueOf(table)));
- boolean wait = false;
- for (String family : families) {
- if (existingFamilies.contains(family) == false) {
- logger.debug("Adding family '" + family + "' to HTable '" + table + "'");
- hbase.addColumn(table, newFamilyDescriptor(family));
- // addColumn() is async, is there a way to wait it finish?
- wait = true;
- }
- }
- if (wait) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- logger.warn("", e);
- }
- }
+ if (tableExists(conn, tableName)) {
+ logger.debug("HTable '" + tableName + "' already exists");
return;
}
- logger.debug("Creating HTable '" + table + "'");
+ logger.debug("Creating HTable '" + tableName + "'");
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table));
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
if (null != families && families.length > 0) {
for (String family : families) {
- HColumnDescriptor fd = newFamilyDescriptor(family);
+ HColumnDescriptor fd = new HColumnDescriptor(family);
+ fd.setInMemory(true); // metadata tables are best in memory
desc.addFamily(fd);
}
}
@@ -275,32 +204,14 @@ public class HBaseConnection {
desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString());
hbase.createTable(desc);
- logger.debug("HTable '" + table + "' created");
+ logger.debug("HTable '" + tableName + "' created");
} finally {
hbase.close();
}
}
- private static Set<String> getFamilyNames(HTableDescriptor desc) {
- HashSet<String> result = Sets.newHashSet();
- for (byte[] bytes : desc.getFamiliesKeys()) {
- try {
- result.add(new String(bytes, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- logger.error(e.toString());
- }
- }
- return result;
- }
-
- private static HColumnDescriptor newFamilyDescriptor(String family) {
- HColumnDescriptor fd = new HColumnDescriptor(family);
- fd.setInMemory(true); // metadata tables are best in memory
- return fd;
- }
-
- public static void deleteTable(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ public static void deleteTable(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
if (!tableExists(conn, tableName)) {
@@ -310,10 +221,10 @@ public class HBaseConnection {
logger.debug("delete HTable '" + tableName + "'");
- if (hbase.isTableEnabled(tableName)) {
- hbase.disableTable(tableName);
+ if (hbase.isTableEnabled(TableName.valueOf(tableName))) {
+ hbase.disableTable(TableName.valueOf(tableName));
}
- hbase.deleteTable(tableName);
+ hbase.deleteTable(TableName.valueOf(tableName));
logger.debug("HTable '" + tableName + "' deleted");
} finally {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index e2f3661..f988dea 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -31,10 +31,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -69,7 +70,7 @@ public class HBaseResourceStore extends ResourceStore {
final String tableNameBase;
final String hbaseUrl;
- private HConnection getConnection() throws IOException {
+ private Connection getConnection() throws IOException {
return HBaseConnection.get(hbaseUrl);
}
@@ -120,7 +121,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] endRow = Bytes.toBytes(lookForPrefix);
endRow[endRow.length - 1]++;
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
Scan scan = new Scan(startRow, endRow);
if ((filter != null && filter instanceof KeyOnlyFilter) == false) {
scan.addColumn(B_FAMILY, B_COLUMN_TS);
@@ -238,13 +239,12 @@ public class HBaseResourceStore extends ResourceStore {
IOUtils.copy(content, bout);
bout.close();
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
table.put(put);
- table.flushCommits();
} finally {
IOUtils.closeQuietly(table);
}
@@ -252,7 +252,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
byte[] row = Bytes.toBytes(resPath);
byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -265,8 +265,6 @@ public class HBaseResourceStore extends ResourceStore {
throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
}
- table.flushCommits();
-
return newTS;
} finally {
IOUtils.closeQuietly(table);
@@ -275,7 +273,7 @@ public class HBaseResourceStore extends ResourceStore {
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
boolean hdfsResourceExist = false;
Result result = internalGetFromHTable(table, resPath, true, false);
@@ -332,12 +330,17 @@ public class HBaseResourceStore extends ResourceStore {
get.addColumn(B_FAMILY, B_COLUMN_TS);
}
- Result result = table.get(get);
- boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
- return exists ? result : null;
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ try {
+ Result result = table.get(get);
+ boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
+ return exists ? result : null;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
}
- private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
@@ -363,7 +366,7 @@ public class HBaseResourceStore extends ResourceStore {
return redirectPath;
}
- private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+ private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
if (content.length > kvSizeLimit) {
writeLargeCellToHdfs(resPath, content, table);
@@ -371,8 +374,8 @@ public class HBaseResourceStore extends ResourceStore {
}
Put put = new Put(row);
- put.add(B_FAMILY, B_COLUMN, content);
- put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+ put.addColumn(B_FAMILY, B_COLUMN, content);
+ put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
return put;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
index 2462873..7272d8d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
@@ -26,12 +26,13 @@ import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.kv.RowConstants;
@@ -86,14 +87,13 @@ public class SimpleHBaseStore implements IGTStore {
}
private class Writer implements IGTWriter {
- final HTableInterface table;
+ final BufferedMutator table;
final ByteBuffer rowkey = ByteBuffer.allocate(50);
final ByteBuffer value = ByteBuffer.allocate(50);
Writer() throws IOException {
- HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
- table = conn.getTable(htableName);
- table.setAutoFlush(false, true);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ table = conn.getBufferedMutator(htableName);
}
@Override
@@ -113,24 +113,24 @@ public class SimpleHBaseStore implements IGTStore {
Put put = new Put(rowkey);
put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value);
- table.put(put);
+ table.mutate(put);
}
@Override
public void close() throws IOException {
- table.flushCommits();
+ table.flush();
table.close();
}
}
class Reader implements IGTScanner {
- final HTableInterface table;
+ final Table table;
final ResultScanner scanner;
int count = 0;
Reader() throws IOException {
- HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getTable(htableName);
Scan scan = new Scan();
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 8ac3832..21166bc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -25,8 +25,9 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -70,7 +71,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
protected final List<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
- private final HTableInterface table;
+ private final Table table;
protected CubeTupleConverter tupleConverter;
protected final Iterator<HBaseKeyRange> rangeIterator;
@@ -88,7 +89,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private int advMeasureRowsRemaining;
private int advMeasureRowIndex;
- public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+ public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
@@ -108,7 +109,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.rangeIterator = keyRanges.iterator();
try {
- this.table = conn.getTable(tableName);
+ this.table = conn.getTable(TableName.valueOf(tableName));
} catch (Throwable t) {
throw new StorageException("Error when open connection to table " + tableName, t);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 672bcbe..f864bed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
@@ -148,7 +148,7 @@ public class CubeStorageQuery implements IStorageQuery {
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
- HConnection conn = HBaseConnection.get(context.getConnUrl());
+ Connection conn = HBaseConnection.get(context.getConnUrl());
// notice we're passing filterD down to storage instead of flatFilter
return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
index 6342c5c..ae442fe 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
@@ -23,9 +23,11 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
/**
* @author yangli9
@@ -50,7 +52,7 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -60,7 +62,7 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -94,4 +96,9 @@ public class RegionScannerAdapter implements RegionScanner {
return Long.MAX_VALUE;
}
+ @Override
+ public int getBatch() {
+ return -1;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e8dd5b9..d033c77 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -57,7 +57,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private int scanCount;
private ITuple next;
- public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
+ public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
StorageContext context, TupleInfo returnTupleInfo) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
index c7b650a..8dba1b1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
@@ -99,7 +99,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
- HRegion region = ctxt.getEnvironment().getRegion();
+ Region region = ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index be26142..7d73a73 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -24,7 +24,9 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
@@ -116,8 +118,8 @@ public class AggregationScanner implements RegionScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- return outerScanner.next(result, limit);
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.next(result, scannerContext);
}
@Override
@@ -126,8 +128,8 @@ public class AggregationScanner implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return outerScanner.nextRaw(result, limit);
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.nextRaw(result, scannerContext);
}
@Override
@@ -160,6 +162,11 @@ public class AggregationScanner implements RegionScanner {
return outerScanner.getMvccReadPoint();
}
+ @Override
+ public int getBatch() {
+ return outerScanner.getBatch();
+ }
+
private static class Stats {
long inputRows = 0;
long inputBytes = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
index 8404262..1809a44 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
@@ -24,12 +24,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
@@ -112,7 +110,7 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -122,7 +120,7 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -161,6 +159,11 @@ public class ObserverAggregationCache extends AggregationCache {
// AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()");
return Long.MAX_VALUE;
}
+
+ @Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index f0e9bed..c69fd8b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -60,7 +60,7 @@ public class ObserverEnabler {
static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
- Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+ Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
if (context.isCoprocessorEnabled() == false) {
return table.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1b6ed089/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 78ad18d..830aca7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -23,6 +23,7 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -35,8 +36,10 @@ import javax.annotation.Nullable;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -279,7 +282,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
- final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+ final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
@@ -357,7 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final boolean[] abnormalFinish = new boolean[1];
try {
- HTableInterface table = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
+ HTableInterface table = conn.get(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
final CubeVisitRequest request = builder.build();
final byte[] startKey = epRange.getFirst();
@@ -432,5 +435,4 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return sb.toString();
}
-
}