You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/02 05:57:00 UTC
[1/3] incubator-kylin git commit: KYLIN-892 & KYLIN-782 Upgrade to
HBase 1.1
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-782 [created] 5875bb0a1
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
index d4e8529..4a9c574 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
@@ -1,88 +1,91 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-
-/**
- * @author yangli9
- *
- */
-public class PingHBaseCLI {
-
- public static void main(String[] args) throws IOException {
- String hbaseTable = args[0];
-
- System.out.println("Hello friend.");
-
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- if (User.isHBaseSecurityEnabled(hconf)) {
- try {
- System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
- TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
- } catch (InterruptedException e) {
- System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
- }
- }
-
- Scan scan = new Scan();
- int limit = 20;
-
- HConnection conn = null;
- HTableInterface table = null;
- ResultScanner scanner = null;
- try {
- conn = HConnectionManager.createConnection(hconf);
- table = conn.getTable(hbaseTable);
- scanner = table.getScanner(scan);
- int count = 0;
- for (Result r : scanner) {
- byte[] rowkey = r.getRow();
- System.out.println(Bytes.toStringBinary(rowkey));
- count++;
- if (count == limit)
- break;
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- table.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+
+/**
+ * @author yangli9
+ *
+ */
+public class PingHBaseCLI {
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ String hbaseTable = args[0];
+
+ System.out.println("Hello friend.");
+
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ if (User.isHBaseSecurityEnabled(hconf)) {
+ Connection conn = ConnectionFactory.createConnection(hconf);
+ try {
+ UserProvider userProvider = UserProvider.instantiate(hconf);
+ TokenUtil.obtainAndCacheToken(conn, userProvider.create(UserGroupInformation.getCurrentUser()));
+ } finally {
+ conn.close();
+ }
+ }
+
+ Scan scan = new Scan();
+ int limit = 20;
+
+ Connection conn = null;
+ Table table = null;
+ ResultScanner scanner = null;
+ try {
+ conn = ConnectionFactory.createConnection(hconf);
+ table = conn.getTable(TableName.valueOf(hbaseTable));
+ scanner = table.getScanner(scan);
+ int count = 0;
+ for (Result r : scanner) {
+ byte[] rowkey = r.getRow();
+ System.out.println(Bytes.toStringBinary(rowkey));
+ count++;
+ if (count == limit)
+ break;
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
index e2eeed0..a07cbe4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
/**
* @author yangli9
@@ -50,7 +51,7 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -60,11 +61,16 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
+ public int getBatch() {
+ return -1;
+ }
+
+ @Override
public void close() throws IOException {
scanner.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index a115753..e950e5b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.kv.RowValueDecoder;
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITupleIterator segmentIterator;
private int scanCount;
- public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+ public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.context = context;
int limit = context.getLimit();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 7a0ab15..a3ece7e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -26,8 +26,9 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -79,14 +80,14 @@ public class EndpointTupleIterator implements ITupleIterator {
Iterator<List<IIProtos.IIResponse.IIRow>> regionResponsesIterator = null;
ITupleIterator tupleIterator = null;
- HTableInterface table = null;
+ Table table = null;
int rowsInAllMetric = 0;
- public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws Throwable {
+ public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn) throws Throwable {
String tableName = segment.getStorageLocationIdentifier();
- table = conn.getTable(tableName);
+ table = conn.getTable(TableName.valueOf(tableName));
factTableName = segment.getIIDesc().getFactTableName();
if (rootFilter == null) {
@@ -212,7 +213,7 @@ public class EndpointTupleIterator implements ITupleIterator {
}
//TODO : async callback
- private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
+ private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() {
public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
@@ -235,7 +236,7 @@ public class EndpointTupleIterator implements ITupleIterator {
int index = 0;
for (int i = 0; i < columns.size(); i++) {
- TblColRef column = columns.get(i);
+ // TblColRef column = columns.get(i);
// if (!dimensions.contains(column)) {
// continue;
// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index d55361b..3c91329 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -90,7 +90,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
RegionScanner innerScanner = null;
HRegion region = null;
try {
- region = env.getRegion();
+ region = (HRegion) env.getRegion();
innerScanner = region.getScanner(buildScan());
region.startRegionOperation();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index 2cecd5c..c21ee36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -93,7 +93,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
- HRegion region = ctxt.getEnvironment().getRegion();
+ HRegion region = (HRegion) ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 65b616f..c1a0b15 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
@@ -93,13 +94,18 @@ public class AggregationScanner implements RegionScanner {
}
@Override
+ public int getBatch() {
+ return outerScanner.getBatch();
+ }
+
+ @Override
public boolean next(List<Cell> results) throws IOException {
return outerScanner.next(results);
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- return outerScanner.next(result, limit);
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.next(result, scannerContext);
}
@Override
@@ -108,8 +114,8 @@ public class AggregationScanner implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return outerScanner.nextRaw(result, limit);
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.nextRaw(result, scannerContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
index f609a5a..dea1d9e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
@@ -107,7 +108,7 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -117,11 +118,16 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
+
+ @Override
public void close() throws IOException {
// AggregateRegionObserver.LOG.info("Kylin Scanner close()");
innerScanner.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index f0f7ed5..fa2a7c1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -23,9 +23,9 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
@@ -58,7 +58,7 @@ public class ObserverEnabler {
static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
- Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+ Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
if (context.isCoprocessorEnabled() == false) {
return table.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index f7fcef1..50069a1 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -1,115 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
- IIInstance ii;
- IISegment seg;
- HConnection hconn;
-
- TableRecordInfo info;
-
- @Before
- public void setup() throws Exception {
- this.createTestMetadata();
-
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
- this.seg = ii.getFirstSegment();
-
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- hconn = HConnectionManager.createConnection(hconf);
-
- this.info = new TableRecordInfo(seg);
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testLoad() throws Exception {
-
- String tableName = seg.getStorageLocationIdentifier();
- IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
- List<Slice> slices = Lists.newArrayList();
- HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
- try {
- for (Slice slice : codec.decodeKeyValue(kvIterator)) {
- slices.add(slice);
- }
- } finally {
- kvIterator.close();
- }
-
- List<TableRecord> records = iterateRecords(slices);
- dump(records);
- System.out.println(records.size() + " records");
- }
-
- private List<TableRecord> iterateRecords(List<Slice> slices) {
- List<TableRecord> records = Lists.newArrayList();
- for (Slice slice : slices) {
- for (RawTableRecord rec : slice) {
- records.add(new TableRecord((RawTableRecord) rec.clone(), info));
- }
- }
- return records;
- }
-
- private void dump(Iterable<TableRecord> records) {
- for (TableRecord rec : records) {
- System.out.println(rec.toString());
-
- byte[] x = rec.getBytes();
- String y = BytesUtil.toReadableText(x);
- System.out.println(y);
- System.out.println();
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+ IIInstance ii;
+ IISegment seg;
+ Connection hconn;
+
+ TableRecordInfo info;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+
+ this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
+ this.seg = ii.getFirstSegment();
+
+ this.hconn = HBaseConnection.get();
+
+ this.info = new TableRecordInfo(seg);
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+
+ String tableName = seg.getStorageLocationIdentifier();
+ IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+ List<Slice> slices = Lists.newArrayList();
+ HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+ try {
+ for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+ slices.add(slice);
+ }
+ } finally {
+ kvIterator.close();
+ }
+
+ List<TableRecord> records = iterateRecords(slices);
+ dump(records);
+ System.out.println(records.size() + " records");
+ }
+
+ private List<TableRecord> iterateRecords(List<Slice> slices) {
+ List<TableRecord> records = Lists.newArrayList();
+ for (Slice slice : slices) {
+ for (RawTableRecord rec : slice) {
+ records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+ }
+ }
+ return records;
+ }
+
+ private void dump(Iterable<TableRecord> records) {
+ for (TableRecord rec : records) {
+ System.out.println(rec.toString());
+
+ byte[] x = rec.getBytes();
+ String y = BytesUtil.toReadableText(x);
+ System.out.println(y);
+ System.out.println();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index 0454b4c..3ace91e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
@@ -223,102 +224,46 @@ public class AggregateRegionObserverTest {
this.input = cellInputs;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
- * .List)
- */
@Override
public boolean next(List<Cell> results) throws IOException {
return nextRaw(results);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
- * .List, int)
- */
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.InternalScanner#close()
- */
@Override
public void close() throws IOException {
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
- */
@Override
public HRegionInfo getRegionInfo() {
return null;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
- */
@Override
public boolean isFilterDone() throws IOException {
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
- */
@Override
public boolean reseek(byte[] row) throws IOException {
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
- */
@Override
public long getMaxResultSize() {
return 0;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
- */
@Override
public long getMvccReadPoint() {
return 0;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
- * .List)
- */
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
if (i < input.size()) {
@@ -328,18 +273,15 @@ public class AggregateRegionObserverTest {
return i < input.size();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
- * .List, int)
- */
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return nextRaw(result);
}
+ @Override
+ public int getBatch() {
+ return -1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
index d17cfa6..b1f6626 100644
--- a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.service.HiveInterface;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -47,7 +46,7 @@ public class HiveMiniClusterTest extends HiveJDBCClientTest {
public static final File HIVE_WAREHOUSE_DIR = new File(HIVE_BASE_DIR + "/warehouse");
public static final File HIVE_TESTDATA_DIR = new File(HIVE_BASE_DIR + "/testdata");
public static final File HIVE_HADOOP_TMP_DIR = new File(HIVE_BASE_DIR + "/hadooptmp");
- protected HiveInterface client;
+ //protected HiveInterface client;
protected MiniDFSCluster miniDFS;
protected MiniMRCluster miniMR;
[3/3] incubator-kylin git commit: KYLIN-892 & KYLIN-782 Upgrade to
HBase 1.1
Posted by li...@apache.org.
KYLIN-892 & KYLIN-782 Upgrade to HBase 1.1
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5875bb0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5875bb0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5875bb0a
Branch: refs/heads/KYLIN-782
Commit: 5875bb0a1780824c2089728594ba5b478c782ddf
Parents: bff402f
Author: Yang Li <li...@apache.org>
Authored: Sun Aug 16 20:22:13 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 2 12:56:23 2015 +0800
----------------------------------------------------------------------
.../common/persistence/HBaseConnection.java | 251 +++----
.../common/persistence/HBaseResourceStore.java | 665 +++++++++----------
.../common/util/HBaseRegionSizeCalculator.java | 41 +-
.../kylin/common/util/BasicHadoopTest.java | 11 +-
.../kylin/job/cube/GarbageCollectionStep.java | 22 +-
.../kylin/job/hadoop/cube/CubeHFileJob.java | 18 +-
.../job/hadoop/cube/StorageCleanupJob.java | 26 +-
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 8 +-
.../hadoop/invertedindex/IICreateHFileJob.java | 22 +-
.../hadoop/invertedindex/IICreateHTableJob.java | 11 +-
.../apache/kylin/job/tools/CleanHtableCLI.java | 8 +-
.../kylin/job/tools/CubeMigrationCLI.java | 24 +-
.../kylin/job/tools/DeployCoprocessorCLI.java | 625 ++++++++---------
.../job/tools/GridTableHBaseBenchmark.java | 37 +-
.../kylin/job/tools/HtableAlterMetadataCLI.java | 8 +-
.../apache/kylin/job/tools/RowCounterCLI.java | 11 +-
.../org/apache/kylin/job/ExportHBaseData.java | 18 +-
.../kylin/job/hadoop/hbase/TestHbaseClient.java | 13 +-
.../kylin/job/tools/HBaseRowDigestTest.java | 11 +-
monitor/pom.xml | 6 +
.../kylin/monitor/MonitorMetaManager.java | 49 +-
pom.xml | 17 +-
.../apache/kylin/rest/service/AclService.java | 38 +-
.../apache/kylin/rest/service/CubeService.java | 35 +-
.../apache/kylin/rest/service/QueryService.java | 21 +-
.../apache/kylin/rest/service/UserService.java | 27 +-
.../storage/hbase/CubeSegmentTupleIterator.java | 21 +-
.../kylin/storage/hbase/CubeStorageEngine.java | 4 +-
.../storage/hbase/HBaseClientKVIterator.java | 187 +++---
.../hbase/InvertedIndexStorageEngine.java | 114 ++--
.../kylin/storage/hbase/PingHBaseCLI.java | 179 ++---
.../storage/hbase/RegionScannerAdapter.java | 10 +-
.../hbase/SerializedHBaseTupleIterator.java | 4 +-
.../endpoint/EndpointTupleIterator.java | 15 +-
.../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +-
.../observer/AggregateRegionObserver.java | 2 +-
.../observer/AggregationScanner.java | 14 +-
.../observer/ObserverAggregationCache.java | 10 +-
.../coprocessor/observer/ObserverEnabler.java | 4 +-
.../storage/hbase/InvertedIndexHBaseTest.java | 227 ++++---
.../observer/AggregateRegionObserverTest.java | 72 +-
.../minicluster/HiveMiniClusterTest.java | 3 +-
42 files changed, 1442 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index c4d0314..85a08a1 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -1,123 +1,128 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.persistence;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- *
- */
-public class HBaseConnection {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
-
- private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
- private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- for (HConnection conn : ConnPool.values()) {
- try {
- conn.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
-
- public static HConnection get(String url) {
- // find configuration
- Configuration conf = ConfigCache.get(url);
- if (conf == null) {
- conf = HadoopUtil.newHBaseConfiguration(url);
- ConfigCache.put(url, conf);
- }
-
- HConnection connection = ConnPool.get(url);
- try {
- // I don't use DCL since recreate a connection is not a big issue.
- if (connection == null) {
- connection = HConnectionManager.createConnection(conf);
- ConnPool.put(url, connection);
- }
- } catch (Throwable t) {
- throw new StorageException("Error when open connection " + url, t);
- }
-
- return connection;
- }
-
- public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException {
- createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
- }
-
- public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
-
- try {
- boolean tableExist = false;
- try {
- hbase.getTableDescriptor(TableName.valueOf(tableName));
- tableExist = true;
- } catch (TableNotFoundException e) {
- }
-
- if (tableExist) {
- logger.debug("HTable '" + tableName + "' already exists");
- return;
- }
-
- logger.debug("Creating HTable '" + tableName + "'");
-
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-
- if (null != families && families.length > 0) {
- for (String family : families) {
- HColumnDescriptor fd = new HColumnDescriptor(family);
- fd.setInMemory(true); // metadata tables are best in memory
- desc.addFamily(fd);
- }
- }
- hbase.createTable(desc);
-
- logger.debug("HTable '" + tableName + "' created");
- } finally {
- hbase.close();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HBaseConnection {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
+
+ private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
+ private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ for (Connection conn : ConnPool.values()) {
+ try {
+ conn.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ }
+
+ public static Connection get() {
+ return get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ }
+
+ public static Connection get(String url) {
+ // find configuration
+ Configuration conf = ConfigCache.get(url);
+ if (conf == null) {
+ conf = HadoopUtil.newHBaseConfiguration(url);
+ ConfigCache.put(url, conf);
+ }
+
+ Connection connection = ConnPool.get(url);
+ try {
+ // I don't use DCL since recreate a connection is not a big issue.
+ if (connection == null) {
+ connection = ConnectionFactory.createConnection(conf);
+ ConnPool.put(url, connection);
+ }
+ } catch (Throwable t) {
+ throw new StorageException("Error when open connection " + url, t);
+ }
+
+ return connection;
+ }
+
+ public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException {
+ createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
+ }
+
+ public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException {
+ Admin admin = conn.getAdmin();
+
+ try {
+ boolean tableExist = false;
+ try {
+ admin.getTableDescriptor(TableName.valueOf(tableName));
+ tableExist = true;
+ } catch (TableNotFoundException e) {
+ }
+
+ if (tableExist) {
+ logger.debug("HTable '" + tableName + "' already exists");
+ return;
+ }
+
+ logger.debug("Creating HTable '" + tableName + "'");
+
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+ if (null != families && families.length > 0) {
+ for (String family : families) {
+ HColumnDescriptor fd = new HColumnDescriptor(family);
+ fd.setInMemory(true); // metadata tables are best in memory
+ desc.addFamily(fd);
+ }
+ }
+ admin.createTable(desc);
+
+ logger.debug("HTable '" + tableName + "' created");
+ } finally {
+ admin.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index 1c4a7ba..8360ff1 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -1,337 +1,334 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.persistence;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-
-import com.google.common.collect.Lists;
-
-public class HBaseResourceStore extends ResourceStore {
-
- private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
- private static final String FAMILY = "f";
- private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
- private static final String COLUMN = "c";
- private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
- private static final String COLUMN_TS = "t";
- private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
-
- private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>();
-
- static {
- TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
- TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
- TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
- TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
- TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
- TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
- TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
- TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
- }
-
- final String tableNameBase;
- final String hbaseUrl;
-
- // final Map<String, String> tableNameMap; // path prefix ==> HBase table name
-
- private HConnection getConnection() throws IOException {
- return HBaseConnection.get(hbaseUrl);
- }
-
- public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
- super(kylinConfig);
-
- String metadataUrl = kylinConfig.getMetadataUrl();
- // split TABLE@HBASE_URL
- int cut = metadataUrl.indexOf('@');
- tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
- hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
- createHTableIfNeeded(getAllInOneTableName());
-
- // tableNameMap = new LinkedHashMap<String, String>();
- // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) {
- // String pathPrefix = entry.getKey();
- // String tableName = tableNameBase + entry.getValue();
- // tableNameMap.put(pathPrefix, tableName);
- // createHTableIfNeeded(tableName);
- // }
-
- }
-
- private void createHTableIfNeeded(String tableName) throws IOException {
- HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
- }
-
- private String getAllInOneTableName() {
- return tableNameBase;
- }
-
- @Override
- protected ArrayList<String> listResourcesImpl(String resPath) throws IOException {
- assert resPath.startsWith("/");
- String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
- byte[] startRow = Bytes.toBytes(lookForPrefix);
- byte[] endRow = Bytes.toBytes(lookForPrefix);
- endRow[endRow.length - 1]++;
-
- ArrayList<String> result = new ArrayList<String>();
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- Scan scan = new Scan(startRow, endRow);
- scan.setFilter(new KeyOnlyFilter());
- try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- String path = Bytes.toString(r.getRow());
- assert path.startsWith(lookForPrefix);
- int cut = path.indexOf('/', lookForPrefix.length());
- String child = cut < 0 ? path : path.substring(0, cut);
- if (result.contains(child) == false)
- result.add(child);
- }
- } finally {
- IOUtils.closeQuietly(table);
- }
- // return null to indicate not a folder
- return result.isEmpty() ? null : result;
- }
-
- @Override
- protected boolean existsImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, null, null);
- return r != null;
- }
-
- @Override
- protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
- byte[] startRow = Bytes.toBytes(rangeStart);
- byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
- Scan scan = new Scan(startRow, endRow);
- scan.addColumn(B_FAMILY, B_COLUMN_TS);
- scan.addColumn(B_FAMILY, B_COLUMN);
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- List<RawResource> result = Lists.newArrayList();
- try {
- ResultScanner scanner = table.getScanner(scan);
- for (Result r : scanner) {
- result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
- }
- } catch (IOException e) {
- for (RawResource rawResource : result) {
- IOUtils.closeQuietly(rawResource.resource);
- }
- throw e;
- } finally {
- IOUtils.closeQuietly(table);
- }
- return result;
- }
-
- private InputStream getInputStream(String resPath, Result r) throws IOException {
- if (r == null) {
- return null;
- }
- byte[] value = r.getValue(B_FAMILY, B_COLUMN);
- if (value.length == 0) {
- Path redirectPath = bigCellHDFSPath(resPath);
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+
+import com.google.common.collect.Lists;
+
+public class HBaseResourceStore extends ResourceStore {
+
+ private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
+ private static final String FAMILY = "f";
+ private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
+ private static final String COLUMN = "c";
+ private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN);
+ private static final String COLUMN_TS = "t";
+ private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS);
+
+ private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>();
+
+ static {
+ TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
+ TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
+ TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
+ TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
+ TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
+ TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
+ TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
+ TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
+ }
+
+ final String tableNameBase;
+ final String hbaseUrl;
+
+ // final Map<String, String> tableNameMap; // path prefix ==> HBase table name
+
+ private Connection getConnection() throws IOException {
+ return HBaseConnection.get(hbaseUrl);
+ }
+
+ public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
+ super(kylinConfig);
+
+ String metadataUrl = kylinConfig.getMetadataUrl();
+ // split TABLE@HBASE_URL
+ int cut = metadataUrl.indexOf('@');
+ tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
+
+ createHTableIfNeeded(getAllInOneTableName());
+
+ // tableNameMap = new LinkedHashMap<String, String>();
+ // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) {
+ // String pathPrefix = entry.getKey();
+ // String tableName = tableNameBase + entry.getValue();
+ // tableNameMap.put(pathPrefix, tableName);
+ // createHTableIfNeeded(tableName);
+ // }
+
+ }
+
+ private void createHTableIfNeeded(String tableName) throws IOException {
+ HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY);
+ }
+
+ private String getAllInOneTableName() {
+ return tableNameBase;
+ }
+
+ @Override
+ protected ArrayList<String> listResourcesImpl(String resPath) throws IOException {
+ assert resPath.startsWith("/");
+ String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/";
+ byte[] startRow = Bytes.toBytes(lookForPrefix);
+ byte[] endRow = Bytes.toBytes(lookForPrefix);
+ endRow[endRow.length - 1]++;
+
+ ArrayList<String> result = new ArrayList<String>();
+
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ Scan scan = new Scan(startRow, endRow);
+ scan.setFilter(new KeyOnlyFilter());
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ String path = Bytes.toString(r.getRow());
+ assert path.startsWith(lookForPrefix);
+ int cut = path.indexOf('/', lookForPrefix.length());
+ String child = cut < 0 ? path : path.substring(0, cut);
+ if (result.contains(child) == false)
+ result.add(child);
+ }
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ // return null to indicate not a folder
+ return result.isEmpty() ? null : result;
+ }
+
+ @Override
+ protected boolean existsImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, null, null);
+ return r != null;
+ }
+
+ @Override
+ protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
+ byte[] startRow = Bytes.toBytes(rangeStart);
+ byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
+
+ Scan scan = new Scan(startRow, endRow);
+ scan.addColumn(B_FAMILY, B_COLUMN_TS);
+ scan.addColumn(B_FAMILY, B_COLUMN);
+
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ List<RawResource> result = Lists.newArrayList();
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
+ }
+ } catch (IOException e) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.resource);
+ }
+ throw e;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ return result;
+ }
+
+ private InputStream getInputStream(String resPath, Result r) throws IOException {
+ if (r == null) {
+ return null;
+ }
+ byte[] value = r.getValue(B_FAMILY, B_COLUMN);
+ if (value.length == 0) {
+ Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- return fileSystem.open(redirectPath);
- } else {
- return new ByteArrayInputStream(value);
- }
- }
-
- private long getTimestamp(Result r) {
- if (r == null) {
- return 0;
- } else {
- return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS));
- }
- }
-
- @Override
- protected InputStream getResourceImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
- return getInputStream(resPath, r);
- }
-
- @Override
- protected long getResourceTimestampImpl(String resPath) throws IOException {
- Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
- return getTimestamp(r);
- }
-
- @Override
- protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- IOUtils.copy(content, bout);
- bout.close();
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- byte[] row = Bytes.toBytes(resPath);
- Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
-
- table.put(put);
- table.flushCommits();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- byte[] row = Bytes.toBytes(resPath);
- byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
- Put put = buildPut(resPath, newTS, row, content, table);
-
- boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
- if (!ok) {
- long real = getResourceTimestamp(resPath);
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ return fileSystem.open(redirectPath);
+ } else {
+ return new ByteArrayInputStream(value);
+ }
+ }
+
+ private long getTimestamp(Result r) {
+ if (r == null) {
+ return 0;
+ } else {
+ return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS));
+ }
+ }
+
+ @Override
+ protected InputStream getResourceImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, B_FAMILY, B_COLUMN);
+ return getInputStream(resPath, r);
+ }
+
+ @Override
+ protected long getResourceTimestampImpl(String resPath) throws IOException {
+ Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS);
+ return getTimestamp(r);
+ }
+
+ @Override
+ protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ IOUtils.copy(content, bout);
+ bout.close();
+
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ try {
+ byte[] row = Bytes.toBytes(resPath);
+ Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
+
+ table.put(put);
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ try {
+ byte[] row = Bytes.toBytes(resPath);
+ byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
+ Put put = buildPut(resPath, newTS, row, content, table);
+
+ boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
+ if (!ok) {
+ long real = getResourceTimestamp(resPath);
throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
- }
-
- table.flushCommits();
-
- return newTS;
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected void deleteResourceImpl(String resPath) throws IOException {
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- Delete del = new Delete(Bytes.toBytes(resPath));
- table.delete(del);
- table.flushCommits();
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- @Override
- protected String getReadableResourcePathImpl(String resPath) {
- return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
- }
-
- private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
- byte[] startRow = Bytes.toBytes(path);
- byte[] endRow = plusZero(startRow);
-
- Scan scan = new Scan(startRow, endRow);
- if (family == null || column == null) {
- scan.setFilter(new KeyOnlyFilter());
- } else {
- scan.addColumn(family, column);
- }
-
- HTableInterface table = getConnection().getTable(getAllInOneTableName());
- try {
- ResultScanner scanner = table.getScanner(scan);
- Result result = null;
- for (Result r : scanner) {
- result = r;
- }
- return result == null || result.isEmpty() ? null : result;
- } finally {
- IOUtils.closeQuietly(table);
- }
- }
-
- private byte[] plusZero(byte[] startRow) {
- byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
- endRow[endRow.length - 1] = 0;
- return endRow;
- }
-
- private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
- Path redirectPath = bigCellHDFSPath(resPath);
+ }
+
+ return newTS;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ try {
+ Delete del = new Delete(Bytes.toBytes(resPath));
+ table.delete(del);
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected String getReadableResourcePathImpl(String resPath) {
+ return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+ }
+
+ private Result getByScan(String path, byte[] family, byte[] column) throws IOException {
+ byte[] startRow = Bytes.toBytes(path);
+ byte[] endRow = plusZero(startRow);
+
+ Scan scan = new Scan(startRow, endRow);
+ if (family == null || column == null) {
+ scan.setFilter(new KeyOnlyFilter());
+ } else {
+ scan.addColumn(family, column);
+ }
+
+ Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
+ try {
+ ResultScanner scanner = table.getScanner(scan);
+ Result result = null;
+ for (Result r : scanner) {
+ result = r;
+ }
+ return result == null || result.isEmpty() ? null : result;
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ private byte[] plusZero(byte[] startRow) {
+ byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1);
+ endRow[endRow.length - 1] = 0;
+ return endRow;
+ }
+
+ private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
+ Path redirectPath = bigCellHDFSPath(resPath);
Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- if (fileSystem.exists(redirectPath)) {
- fileSystem.delete(redirectPath, true);
- }
-
- FSDataOutputStream out = fileSystem.create(redirectPath);
-
- try {
- out.write(largeColumn);
- } finally {
- IOUtils.closeQuietly(out);
- }
-
- return redirectPath;
- }
-
- public Path bigCellHDFSPath(String resPath) {
- String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
- Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath);
- return redirectPath;
- }
-
- private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
- int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
- if (content.length > kvSizeLimit) {
- writeLargeCellToHdfs(resPath, content, table);
- content = BytesUtil.EMPTY_BYTE_ARRAY;
- }
-
- Put put = new Put(row);
- put.add(B_FAMILY, B_COLUMN, content);
- put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
-
- return put;
- }
-}
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ if (fileSystem.exists(redirectPath)) {
+ fileSystem.delete(redirectPath, true);
+ }
+
+ FSDataOutputStream out = fileSystem.create(redirectPath);
+
+ try {
+ out.write(largeColumn);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+
+ return redirectPath;
+ }
+
+ public Path bigCellHDFSPath(String resPath) {
+ String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
+ Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath);
+ return redirectPath;
+ }
+
+ private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
+ int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
+ if (content.length > kvSizeLimit) {
+ writeLargeCellToHdfs(resPath, content, table);
+ content = BytesUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ Put put = new Put(row);
+ put.addColumn(B_FAMILY, B_COLUMN, content);
+ put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+
+ return put;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index 093ac9e..ccbb6f0 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -23,19 +23,24 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,30 +58,31 @@ public class HBaseRegionSizeCalculator {
/**
* Computes size of each region for table and given column families.
* */
- public HBaseRegionSizeCalculator(HTable table) throws IOException {
- this(table, new HBaseAdmin(table.getConfiguration()));
- }
-
- /** Constructor for unit testing */
- HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
-
+ public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException {
+ Table table = null;
+ Admin admin = null;
+
try {
+ table = hbaseConnection.getTable(TableName.valueOf(tableName));
+ admin = hbaseConnection.getAdmin();
+
if (!enabled(table.getConfiguration())) {
logger.info("Region size calculation disabled.");
return;
}
- logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+ logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
// Get regions for table.
- Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
+ List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- for (HRegionInfo regionInfo : tableRegionInfos) {
- tableRegions.add(regionInfo.getRegionName());
+ for (HRegionLocation hRegionLocation : regionLocationList) {
+ tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
}
- ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+ ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
@@ -99,7 +105,8 @@ public class HBaseRegionSizeCalculator {
}
}
} finally {
- hBaseAdmin.close();
+ IOUtils.closeQuietly(table);
+ IOUtils.closeQuietly(admin);
}
}
@@ -124,4 +131,4 @@ public class HBaseRegionSizeCalculator {
public Map<byte[], Long> getRegionSizeMap() {
return Collections.unmodifiableMap(sizeMap);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
index 6d2762c..481fc6c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
@@ -21,12 +21,11 @@ package org.apache.kylin.common.util;
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -56,16 +55,14 @@ public class BasicHadoopTest {
cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
tableDesc.addFamily(cf);
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get().getAdmin();
admin.createTable(tableDesc);
admin.close();
}
@Test
public void testRetriveHtableHost() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables();
for (HTableDescriptor table : tableDescriptors) {
String value = table.getValue("KYLIN_HOST");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index f2f1fc0..8c61a3a 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -24,14 +24,13 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.cmd.ShellCmdOutput;
import org.apache.kylin.job.exception.ExecuteException;
@@ -99,19 +98,18 @@ public class GarbageCollectionStep extends AbstractExecutable {
List<String> oldTables = getOldHTables();
if (oldTables != null && oldTables.size() > 0) {
String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = null;
+ Admin admin = null;
try {
- admin = new HBaseAdmin(conf);
+ admin = HBaseConnection.get().getAdmin();
for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ if (admin.tableExists(TableName.valueOf(table))) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
+ if (admin.isTableEnabled(TableName.valueOf(table))) {
+ admin.disableTable(TableName.valueOf(table));
}
- admin.deleteTable(table);
+ admin.deleteTable(TableName.valueOf(table));
logger.debug("Dropped HBase table " + table);
output.append("Dropped HBase table " + table + " \n");
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index 3c1e4a5..6f36eff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -19,11 +19,15 @@
package org.apache.kylin.job.hadoop.cube;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.mapreduce.Job;
@@ -31,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.constant.BatchConstants;
@@ -47,6 +52,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
public int run(String[] args) throws Exception {
Options options = new Options();
+ Connection connection = null;
+ Table table = null;
try {
options.addOption(OPTION_JOB_NAME);
@@ -80,10 +87,12 @@ public class CubeHFileJob extends AbstractHadoopJob {
attachKylinPropsAndMetadata(cube, job.getConfiguration());
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTable htable = new HTable(conf, tableName);
+ connection = HBaseConnection.get();
+ table = connection.getTable(TableName.valueOf(tableName));
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
//Automatic config !
- HFileOutputFormat.configureIncrementalLoad(job, htable);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
// set block replication to 3 for hfiles
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -96,6 +105,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
printUsage(options);
throw e;
} finally {
+ IOUtils.closeQuietly(table);
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 3b25ee1..184b6cd 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,6 +18,13 @@
package org.apache.kylin.job.hadoop.cube;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -50,13 +59,6 @@ import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
/**
* @author ysong1
*/
@@ -107,7 +109,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
// get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -141,9 +143,9 @@ public class StorageCleanupJob extends AbstractHadoopJob {
// drop tables
for (String htableName : allTablesNeedToBeDropped) {
log.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- hbaseAdmin.disableTable(htableName);
- hbaseAdmin.deleteTable(htableName);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htableName));
+ hbaseAdmin.deleteTable(TableName.valueOf(htableName));
log.info("Deleted HBase table " + htableName);
} else {
log.info("HBase table" + htableName + " does not exist");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 027c0ca..9f5e062 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -25,11 +25,10 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
@@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(conf);
+ Admin admin = HBaseConnection.get().getAdmin();
try {
if (User.isHBaseSecurityEnabled(conf)) {
@@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
byte[][] splitKeys = getSplits(conf, partitionFilePath);
- if (admin.tableExists(tableName)) {
+ if (admin.tableExists(TableName.valueOf(tableName))) {
// admin.disableTable(tableName);
// admin.deleteTable(tableName);
throw new RuntimeException("HBase table " + tableName + " exists!");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index c032bbc..fa42148 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -19,17 +19,20 @@
package org.apache.kylin.job.hadoop.invertedindex;
import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
@@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
public int run(String[] args) throws Exception {
Options options = new Options();
+ Connection connection = null;
+ Table table = null;
try {
options.addOption(OPTION_JOB_NAME);
@@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob {
job.setMapOutputValueClass(KeyValue.class);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
- HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ connection = HBaseConnection.get();
+ table = connection.getTable(TableName.valueOf(tableName));
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
this.deletePath(job.getConfiguration(), output);
@@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
} catch (Exception e) {
printUsage(options);
throw e;
+ } finally {
+ IOUtils.closeQuietly(table);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 32d065a..63777ef 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
@@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob {
DeployCoprocessorCLI.deployCoprocessor(tableDesc);
// drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
+ Admin admin = HBaseConnection.get().getAdmin();
+ if (admin.tableExists(TableName.valueOf(tableName))) {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
}
// create table
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
index b6e5af5..7fc1d72 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
@@ -21,11 +21,10 @@ package org.apache.kylin.job.tools;
import java.io.IOException;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob {
}
private void clean() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
String name = descriptor.getNameAsString().toLowerCase();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index b07d6a9..503f07e 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -24,14 +24,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
@@ -73,7 +74,7 @@ public class CubeMigrationCLI {
private static ResourceStore srcStore;
private static ResourceStore dstStore;
private static FileSystem hdfsFS;
- private static HBaseAdmin hbaseAdmin;
+ private static Admin hbaseAdmin;
public static void main(String[] args) throws IOException, InterruptedException {
@@ -113,8 +114,7 @@ public class CubeMigrationCLI {
checkAndGetHbaseUrl();
- Configuration conf = HBaseConfiguration.create();
- hbaseAdmin = new HBaseAdmin(conf);
+ hbaseAdmin = HBaseConnection.get().getAdmin();
hdfsFS = FileSystem.get(new Configuration());
@@ -130,6 +130,8 @@ public class CubeMigrationCLI {
} else {
showOpts();
}
+
+ IOUtils.closeQuietly(hbaseAdmin);
}
public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
@@ -284,10 +286,10 @@ public class CubeMigrationCLI {
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
logger.info("CHANGE_HTABLE_HOST is completed");
break;
}
@@ -401,10 +403,10 @@ public class CubeMigrationCLI {
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
break;
}
case COPY_FILE_IN_META: {
[2/3] incubator-kylin git commit: KYLIN-892 & KYLIN-782 Upgrade to
HBase 1.1
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index 5482684..239c7ec 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -1,313 +1,314 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
- public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
- public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-
- public static void main(String[] args) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
- String localCoprocessorJar = new File(args[0]).getAbsolutePath();
- logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
- List<String> tableNames = getHTableNames(kylinConfig);
- logger.info("Identify tables " + tableNames);
-
- Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
- logger.info("Old coprocessor jar: " + oldJarPaths);
-
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
- logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
- // Don't remove old jars, missing coprocessor jar will fail hbase
- // removeOldJars(oldJarPaths, fileSystem);
-
- hbaseAdmin.close();
-
- logger.info("Processed " + processedTables);
- logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
- }
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+ public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+ public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+ public static void main(String[] args) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
- desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
- }
-
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
-
- logger.info("Unset coprocessor on " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
- desc.removeCoprocessor(OBSERVER_CLS_NAME);
- }
- while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
- desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
- }
-
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- hbaseAdmin.modifyTable(tableName, desc);
-
- logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
- }
-
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
- try {
- resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
- logger.error("Error processing " + tableName, ex);
- }
- }
- return processed;
- }
-
- public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
- FileStatus newestJar = null;
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getPath().toString().endsWith(".jar")) {
- if (newestJar == null) {
- newestJar = fileStatus;
- } else {
- if (newestJar.getModificationTime() < fileStatus.getModificationTime())
- newestJar = fileStatus;
- }
- }
- }
- if (newestJar == null)
- return null;
-
- Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
- logger.info("The newest coprocessor is " + path.toString());
- return path;
- }
-
- public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
- HashSet<String> result = new HashSet<String>();
-
- for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
- try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- } catch (TableNotFoundException e) {
- logger.warn("Table not found " + tableName, e);
- continue;
- }
-
- Matcher keyMatcher;
- Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
- keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
- if (!keyMatcher.matches()) {
- continue;
- }
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
-
- String jarPath = valueMatcher.group(1).trim();
- String clsName = valueMatcher.group(2).trim();
-
- if (OBSERVER_CLS_NAME.equals(clsName)) {
- result.add(jarPath);
- }
- }
- }
-
- return result;
- }
-
- private static List<String> getHTableNames(KylinConfig config) {
- CubeManager cubeMgr = CubeManager.getInstance(config);
-
- ArrayList<String> result = new ArrayList<String>();
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
- for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false) {
- result.add(tableName);
- System.out.println("added new table: " + tableName);
- }
- }
- }
-
- return result;
- }
-}
+ FileSystem fileSystem = FileSystem.get(hconf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
+
+ String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+ logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+ List<String> tableNames = getHTableNames(kylinConfig);
+ logger.info("Identify tables " + tableNames);
+
+ Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+ logger.info("Old coprocessor jar: " + oldJarPaths);
+
+ Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+ logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+ List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+ // Don't remove old jars, missing coprocessor jar will fail hbase
+ // removeOldJars(oldJarPaths, fileSystem);
+
+ hbaseAdmin.close();
+
+ logger.info("Processed " + processedTables);
+ logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+ }
+
+ public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ try {
+ initHTableCoprocessor(tableDesc);
+ logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+ logger.error("Will try creating the table without coprocessor.");
+ }
+ }
+
+ private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+ desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+ }
+
+ public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Disable " + tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
+
+ logger.info("Unset coprocessor on " + tableName);
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+ desc.removeCoprocessor(OBSERVER_CLS_NAME);
+ }
+ while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+ desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+ }
+
+ addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+
+ logger.info("Enable " + tableName);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
+ }
+
+ private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ List<String> processed = new ArrayList<String>();
+
+ for (String tableName : tableNames) {
+ try {
+ resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+ processed.add(tableName);
+ } catch (IOException ex) {
+ logger.error("Error processing " + tableName, ex);
+ }
+ }
+ return processed;
+ }
+
+ public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+ FileStatus newestJar = null;
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getPath().toString().endsWith(".jar")) {
+ if (newestJar == null) {
+ newestJar = fileStatus;
+ } else {
+ if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+ newestJar = fileStatus;
+ }
+ }
+ }
+ if (newestJar == null)
+ return null;
+
+ Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+ logger.info("The newest coprocessor is " + path.toString());
+ return path;
+ }
+
+ public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+ Path uploadPath = null;
+ File localCoprocessorFile = new File(localCoprocessorJar);
+
+ // check existing jars
+ if (oldJarPaths == null) {
+ oldJarPaths = new HashSet<String>();
+ }
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
+ uploadPath = fileStatus.getPath();
+ break;
+ }
+ String filename = fileStatus.getPath().toString();
+ if (filename.endsWith(".jar")) {
+ oldJarPaths.add(filename);
+ }
+ }
+
+ // upload if not existing
+ if (uploadPath == null) {
+ // figure out a unique new jar file name
+ Set<String> oldJarNames = new HashSet<String>();
+ for (String path : oldJarPaths) {
+ oldJarNames.add(new Path(path).getName());
+ }
+ String baseName = getBaseFileName(localCoprocessorJar);
+ String newName = null;
+ int i = 0;
+ while (newName == null) {
+ newName = baseName + "-" + (i++) + ".jar";
+ if (oldJarNames.contains(newName))
+ newName = null;
+ }
+
+ // upload
+ uploadPath = new Path(coprocessorDir, newName);
+ FileInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ in = new FileInputStream(localCoprocessorFile);
+ out = fileSystem.create(uploadPath);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+
+ fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+ }
+
+ uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+ return uploadPath;
+ }
+
+ private static String getBaseFileName(String localCoprocessorJar) {
+ File localJar = new File(localCoprocessorJar);
+ String baseName = localJar.getName();
+ if (baseName.endsWith(".jar"))
+ baseName = baseName.substring(0, baseName.length() - ".jar".length());
+ return baseName;
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+ fileSystem.mkdirs(coprocessorDir);
+ return coprocessorDir;
+ }
+
+ private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
+ HashSet<String> result = new HashSet<String>();
+
+ for (String tableName : tableNames) {
+ HTableDescriptor tableDescriptor = null;
+ try {
+ tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ } catch (TableNotFoundException e) {
+ logger.warn("Table not found " + tableName, e);
+ continue;
+ }
+
+ Matcher keyMatcher;
+ Matcher valueMatcher;
+ for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+ keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+ if (!keyMatcher.matches()) {
+ continue;
+ }
+ valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+ if (!valueMatcher.matches()) {
+ continue;
+ }
+
+ String jarPath = valueMatcher.group(1).trim();
+ String clsName = valueMatcher.group(2).trim();
+
+ if (OBSERVER_CLS_NAME.equals(clsName)) {
+ result.add(jarPath);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<String> getHTableNames(KylinConfig config) {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ ArrayList<String> result = new ArrayList<String>();
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+ for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
+ String tableName = seg.getStorageLocationIdentifier();
+ if (StringUtils.isBlank(tableName) == false) {
+ result.add(tableName);
+ System.out.println("added new table: " + tableName);
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
index 70e1df6..5fe5e58 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
@@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark {
public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
- HConnection conn = HBaseConnection.get(hbaseUrl);
+ Connection conn = HBaseConnection.get(hbaseUrl);
createHTableIfNeeded(conn, TEST_TABLE);
prepareData(conn);
@@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark {
}
- private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+ private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
Stats stats = new Stats("COLUMN_SCAN");
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark {
}
}
- private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
}
- private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
}
- private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+ private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
}
- private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark {
}
}
- private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+ private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
- HTableInterface table = conn.getTable(TEST_TABLE);
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
@@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark {
}
}
- private static void prepareData(HConnection conn) throws IOException {
- HTableInterface table = conn.getTable(TEST_TABLE);
+ private static void prepareData(Connection conn) throws IOException {
+ Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
// check how many rows existing
@@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark {
byte[] rowkey = Bytes.toBytes(i);
Put put = new Put(rowkey);
byte[] cell = randomBytes();
- put.add(CF, QN, cell);
+ put.addColumn(CF, QN, cell);
table.put(put);
nBytes += cell.length;
dot(i, N_ROWS);
@@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark {
return bytes;
}
- private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
- HBaseAdmin hbase = new HBaseAdmin(conn);
+ private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+ Admin hbase = conn.getAdmin();
try {
boolean tableExist = false;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
index 53930e3..e283748 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
@@ -23,12 +23,11 @@ import java.io.IOException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob {
}
private void alter() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ Admin hbaseAdmin = HBaseConnection.get().getAdmin();
HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
hbaseAdmin.disableTable(table.getTableName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
index 3329d27..4d44088 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -22,11 +22,12 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.slf4j.Logger;
@@ -69,8 +70,8 @@ public class RowCounterCLI {
logger.info("My Scan " + scan.toString());
- HConnection conn = HConnectionManager.createConnection(conf);
- HTableInterface tableInterface = conn.getTable(htableName);
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table tableInterface = conn.getTable(TableName.valueOf(htableName));
Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
int counter = 0;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
index e784a41..95a483d 100644
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
@@ -22,10 +22,11 @@ import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -39,7 +40,7 @@ public class ExportHBaseData {
KylinConfig kylinConfig;
HTableDescriptor[] allTables;
Configuration config;
- HBaseAdmin hbase;
+ Admin admin;
CliCommandExecutor cli;
String exportHdfsFolder;
String exportLocalFolderParent;
@@ -75,12 +76,11 @@ public class ExportHBaseData {
int cut = metadataUrl.indexOf('@');
tableNameBase = metadataUrl.substring(0, cut);
String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
- HConnection conn = HBaseConnection.get(hbaseUrl);
+ Connection conn = HBaseConnection.get(hbaseUrl);
try {
- hbase = new HBaseAdmin(conn);
- config = hbase.getConfiguration();
- allTables = hbase.listTables();
+ admin = conn.getAdmin();
+ config = admin.getConfiguration();
+ allTables = admin.listTables();
} catch (IOException e) {
e.printStackTrace();
throw e;
@@ -89,6 +89,8 @@ public class ExportHBaseData {
public void tearDown() {
+ // close hbase admin
+ IOUtils.closeQuietly(admin);
// cleanup hdfs
try {
if (cli != null && exportHdfsFolder != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
index f2b9ed6..5a04d20 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
@@ -22,8 +22,11 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.util.Bytes;
/**
@@ -90,13 +93,15 @@ public class TestHbaseClient {
conf.set("hbase.zookeeper.quorum", "hbase_host");
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
- HTable table = new HTable(conf, "test1");
+ Connection connection = ConnectionFactory.createConnection(conf);
+ Table table = connection.getTable(TableName.valueOf("test1"));
Put put = new Put(Bytes.toBytes("row1"));
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+ put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+ put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
table.put(put);
table.close();
+ connection.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
index 9f9c23c..f5f94c8 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
@@ -23,10 +23,11 @@ import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.BytesUtil;
@@ -60,11 +61,11 @@ public class HBaseRowDigestTest extends HBaseMetadataTestCase {
@Test
public static void test() throws IOException {
String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
- HConnection conn = null;
- HTableInterface table = null;
+ Connection conn = null;
+ Table table = null;
try {
conn = HBaseConnection.get(hbaseUrl);
- table = conn.getTable("KYLIN_II_YTYWP3CQGJ");
+ table = conn.getTable(TableName.valueOf("KYLIN_II_YTYWP3CQGJ"));
ResultScanner scanner = table.getScanner(CF, QN);
StringBuffer sb = new StringBuffer();
while (true) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/monitor/pom.xml
----------------------------------------------------------------------
diff --git a/monitor/pom.xml b/monitor/pom.xml
index 6530493..4e69f3a 100644
--- a/monitor/pom.xml
+++ b/monitor/pom.xml
@@ -39,6 +39,12 @@
<dependencies>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
----------------------------------------------------------------------
diff --git a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
index 97200fc..94b3937 100644
--- a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
+++ b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
@@ -20,18 +20,21 @@ package org.apache.kylin.monitor;
import java.io.IOException;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.log4j.Logger;
/**
@@ -122,11 +125,10 @@ public class MonitorMetaManager {
public static String getListWithRowkey(String table, String rowkey) throws IOException {
Result result = getResultByRowKey(table, rowkey);
String fileList = null;
- if (result.list() != null) {
- for (KeyValue kv : result.list()) {
- fileList = Bytes.toString(kv.getValue());
+ if (result.listCells() != null) {
+ for (Cell cell : result.listCells()) {
+ fileList = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset());
}
-
}
fileList = fileList == null ? "" : fileList;
return fileList;
@@ -164,16 +166,20 @@ public class MonitorMetaManager {
* create table in hbase
*/
public static void creatTable(String tableName, String[] family) throws Exception {
- HBaseAdmin admin = new HBaseAdmin(conf);
- HTableDescriptor desc = new HTableDescriptor(tableName);
- for (int i = 0; i < family.length; i++) {
- desc.addFamily(new HColumnDescriptor(family[i]));
- }
- if (admin.tableExists(tableName)) {
- logger.info("table Exists!");
- } else {
- admin.createTable(desc);
- logger.info("create table Success!");
+ Admin admin = HBaseConnection.get().getAdmin();
+ try {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (int i = 0; i < family.length; i++) {
+ desc.addFamily(new HColumnDescriptor(family[i]));
+ }
+ if (admin.tableExists(TableName.valueOf(tableName))) {
+ logger.info("table Exists!");
+ } else {
+ admin.createTable(desc);
+ logger.info("create table Success!");
+ }
+ } finally {
+ IOUtils.closeQuietly(admin);
}
}
@@ -181,13 +187,15 @@ public class MonitorMetaManager {
* update cell in hbase
*/
public static void updateData(String tableName, String rowKey, String family, String column, String value) throws IOException {
- HTable table = new HTable(conf, Bytes.toBytes(tableName));
+ Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
Put put = new Put(rowKey.getBytes());
- put.add(family.getBytes(), column.getBytes(), value.getBytes());
+ put.addColumn(family.getBytes(), column.getBytes(), value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(table);
}
logger.info("update table [" + tableName + "]");
logger.info("rowKey [" + rowKey + "]");
@@ -200,9 +208,10 @@ public class MonitorMetaManager {
* get result by rowkey
*/
public static Result getResultByRowKey(String tableName, String rowKey) throws IOException {
- HTable table = new HTable(conf, Bytes.toBytes(tableName));
+ Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
+ IOUtils.closeQuietly(table);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d9b32be..a485548 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,12 +45,13 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Hadoop versions -->
- <hadoop2.version>2.6.0</hadoop2.version>
- <yarn.version>2.6.0</yarn.version>
+ <hadoop2.version>2.7.1</hadoop2.version>
+ <yarn.version>2.7.1</yarn.version>
<zookeeper.version>3.4.6</zookeeper.version>
- <hive.version>0.14.0</hive.version>
- <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
- <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+ <hive.version>1.2.1</hive.version>
+ <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+ <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
+ <curator.version>2.7.1</curator.version>
<!-- Dependency versions -->
<antlr.version>3.4</antlr.version>
@@ -88,9 +89,6 @@
<!-- Calcite Version -->
<calcite.version>1.4.0-incubating</calcite.version>
- <!-- Curator.version Version -->
- <curator.version>2.6.0</curator.version>
-
<!-- Sonar -->
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
@@ -821,8 +819,7 @@
</configuration>
</plugin>
- <!-- Override the parent assembly execution to customize the assembly
- descriptor and final name. -->
+ <!-- Override the parent assembly execution to customize the assembly descriptor and final name. -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/server/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
index ea2a48e..8a1cf6d 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -29,13 +29,14 @@ import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.kylin.common.KylinConfig;
@@ -130,9 +131,9 @@ public class AclService implements MutableAclService {
@Override
public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
Scan scan = new Scan();
SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity)));
@@ -179,10 +180,10 @@ public class AclService implements MutableAclService {
@Override
public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
- HTableInterface htable = null;
+ Table htable = null;
Result result = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
for (ObjectIdentity oid : oids) {
result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier()))));
@@ -231,16 +232,15 @@ public class AclService implements MutableAclService {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
PrincipalSid sid = new PrincipalSid(auth);
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
- put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
- put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
- put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+ put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+ put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+ put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " created successfully.");
} catch (IOException e) {
@@ -254,9 +254,9 @@ public class AclService implements MutableAclService {
@Override
public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
List<ObjectIdentity> children = findChildren(objectIdentity);
@@ -269,7 +269,6 @@ public class AclService implements MutableAclService {
}
htable.delete(delete);
- htable.flushCommits();
logger.debug("ACL of " + objectIdentity + " deleted successfully.");
} catch (IOException e) {
@@ -287,27 +286,26 @@ public class AclService implements MutableAclService {
throw e;
}
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
- delete.deleteFamily(Bytes.toBytes(ACL_ACES_FAMILY));
+ delete.addFamily(Bytes.toBytes(ACL_ACES_FAMILY));
htable.delete(delete);
Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
if (null != acl.getParentAcl()) {
- put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+ put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
}
for (AccessControlEntry ace : acl.getEntries()) {
AceInfo aceInfo = new AceInfo(ace);
- put.add(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+ put.addColumn(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
}
if (!put.isEmpty()) {
htable.put(put);
- htable.flushCommits();
logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f115d89..2770475 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,9 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -430,33 +430,24 @@ public class CubeService extends BasicService {
* @throws IOException Exception when HTable resource is not closed correctly.
*/
public HBaseResponse getHTableInfo(String tableName) throws IOException {
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- HTable table = null;
+ Connection conn = HBaseConnection.get();
HBaseResponse hr = null;
long tableSize = 0;
int regionCount = 0;
- try {
- table = new HTable(hconf, tableName);
-
- HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
- Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+ HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+ Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
- for (long s : sizeMap.values()) {
- tableSize += s;
- }
+ for (long s : sizeMap.values()) {
+ tableSize += s;
+ }
- regionCount = sizeMap.size();
+ regionCount = sizeMap.size();
- // Set response.
- hr = new HBaseResponse();
- hr.setTableSize(tableSize);
- hr.setRegionCount(regionCount);
- } finally {
- if (null != table) {
- table.close();
- }
- }
+ // Set response.
+ hr = new HBaseResponse();
+ hr.setTableSize(tableSize);
+ hr.setRegionCount(regionCount);
return hr;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 764df4b..7d14021 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,11 @@ import javax.sql.DataSource;
import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.persistence.HBaseConnection;
@@ -124,14 +125,13 @@ public class QueryService extends BasicService {
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -157,14 +157,13 @@ public class QueryService extends BasicService {
Query[] queryArray = new Query[queries.size()];
byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(creator));
- put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+ put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
htable.put(put);
- htable.flushCommits();
} finally {
IOUtils.closeQuietly(htable);
}
@@ -176,9 +175,9 @@ public class QueryService extends BasicService {
}
List<Query> queries = new ArrayList<Query>();
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/server/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
index d665ab9..d03cd55 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -25,13 +25,14 @@ import java.util.Collection;
import java.util.List;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
@@ -75,9 +76,9 @@ public class UserService implements UserManager {
@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Get get = new Get(Bytes.toBytes(username));
get.addFamily(Bytes.toBytes(USER_AUTHORITY_FAMILY));
@@ -106,15 +107,14 @@ public class UserService implements UserManager {
@Override
public void updateUser(UserDetails user) {
- HTableInterface htable = null;
+ Table htable = null;
try {
byte[] userAuthorities = serialize(user.getAuthorities());
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Put put = new Put(Bytes.toBytes(user.getUsername()));
- put.add(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
+ put.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
htable.put(put);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -124,13 +124,12 @@ public class UserService implements UserManager {
@Override
public void deleteUser(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Delete delete = new Delete(Bytes.toBytes(username));
htable.delete(delete);
- htable.flushCommits();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
@@ -145,9 +144,9 @@ public class UserService implements UserManager {
@Override
public boolean userExists(String username) {
- HTableInterface htable = null;
+ Table htable = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
Result result = htable.get(new Get(Bytes.toBytes(username)));
return null != result && !result.isEmpty();
@@ -164,10 +163,10 @@ public class UserService implements UserManager {
s.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN));
List<String> authorities = new ArrayList<String>();
- HTableInterface htable = null;
+ Table htable = null;
ResultScanner scanner = null;
try {
- htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+ htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
scanner = htable.getScanner(s);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index 9efbb79..6f08f8a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -28,16 +28,16 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Bytes;
@@ -82,7 +82,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private final Collection<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
- private final HTableInterface table;
+ private final Table table;
private final RowKeyDecoder rowKeyDecoder;
private final Iterator<HBaseKeyRange> rangeIterator;
@@ -94,7 +94,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private int scanCount;
private int scanCountDelta;
- public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+ public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, Connection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.cube = cubeSeg.getCubeInstance();
this.cubeSeg = cubeSeg;
this.dimensions = dimensions;
@@ -106,7 +106,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
try {
- this.table = conn.getTable(tableName);
+ this.table = conn.getTable(TableName.valueOf(tableName));
} catch (Throwable t) {
throw new StorageException("Error when open connection to table " + tableName, t);
}
@@ -122,12 +122,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private void closeScanner() {
flushScanCountDelta();
-
+
if (logger.isDebugEnabled() && scan != null) {
logger.debug("Scan " + scan.toString());
- byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
- if (metricsBytes != null) {
- ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
+ ScanMetrics scanMetrics = scan.getScanMetrics();
+ if (scanMetrics != null) {
logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
}
}
@@ -254,7 +253,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
Scan scan = new Scan();
scan.setCaching(SCAN_CACHE);
scan.setCacheBlocks(true);
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+ scan.setScanMetricsEnabled(true);
for (RowValueDecoder valueDecoder : this.rowValueDecoders) {
HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn();
byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index 8eb7bcb..9ae296f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
@@ -140,7 +140,7 @@ public class CubeStorageEngine implements IStorageEngine {
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
- HConnection conn = HBaseConnection.get(context.getConnUrl());
+ Connection conn = HBaseConnection.get(context.getConnUrl());
return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
index 918fd4b..6a76baa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
@@ -1,93 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- *
- */
-public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
-
- byte[] family;
- byte[] qualifier;
-
- HTableInterface table;
- ResultScanner scanner;
- Iterator<Result> iterator;
-
- public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
- this.family = family;
- this.qualifier = qualifier;
-
- this.table = hconn.getTable(tableName);
- this.scanner = table.getScanner(family, qualifier);
- this.iterator = scanner.iterator();
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(scanner);
- IOUtils.closeQuietly(table);
- }
-
- @Override
- public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
- return new MyIterator();
- }
-
- private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
-
- ImmutableBytesWritable key = new ImmutableBytesWritable();
- ImmutableBytesWritable value = new ImmutableBytesWritable();
- Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
- Result r = iterator.next();
- Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
- key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
- value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- return pair;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
+
+ byte[] family;
+ byte[] qualifier;
+
+ Table table;
+ ResultScanner scanner;
+ Iterator<Result> iterator;
+
+ public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
+ this.family = family;
+ this.qualifier = qualifier;
+
+ this.table = hconn.getTable(TableName.valueOf(tableName));
+ this.scanner = table.getScanner(family, qualifier);
+ this.iterator = scanner.iterator();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(scanner);
+ IOUtils.closeQuietly(table);
+ }
+
+ @Override
+ public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
+ return new MyIterator();
+ }
+
+ private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
+
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ ImmutableBytesWritable value = new ImmutableBytesWritable();
+ Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+ Result r = iterator.next();
+ Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+ key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+ value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ return pair;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5875bb0a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
index afb49c0..e518a4c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
@@ -1,57 +1,57 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageEngine implements IStorageEngine {
-
- private IISegment seg;
-
- public InvertedIndexStorageEngine(IIInstance ii) {
- this.seg = ii.getFirstSegment();
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
- String tableName = seg.getStorageLocationIdentifier();
-
- //HConnection is cached, so need not be closed
- HConnection conn = HBaseConnection.get(context.getConnUrl());
- try {
- return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
- } catch (Throwable e) {
- e.printStackTrace();
- throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexStorageEngine implements IStorageEngine {
+
+ private IISegment seg;
+
+ public InvertedIndexStorageEngine(IIInstance ii) {
+ this.seg = ii.getFirstSegment();
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
+ String tableName = seg.getStorageLocationIdentifier();
+
+ // Connection is cached, so need not be closed
+ Connection conn = HBaseConnection.get(context.getConnUrl());
+ try {
+ return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
+ }
+ }
+}