You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/05 09:57:23 UTC
incubator-kylin git commit: create a cubesegment with enableshard set
true
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1126 ad3622e12 -> b6f6311bc
create a cubesegment with enableshard set true
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b6f6311b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b6f6311b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b6f6311b
Branch: refs/heads/KYLIN-1126
Commit: b6f6311bc2901382ce65cca0eaa3802f45a3598d
Parents: ad3622e
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 17:01:35 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 17:01:35 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 1 +
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 17 +++++++++++++----
.../kylin/storage/hbase/steps/CreateHTableJob.java | 5 ++---
.../kylin/storage/hbase/steps/CubeHTableUtil.java | 3 ++-
.../storage/hbase/steps/HBaseStreamingOutput.java | 7 +------
5 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2232f01..3ead061 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -568,6 +568,7 @@ public class CubeManager implements IRealizationProvider {
segment.setDateRangeEnd(endDate);
segment.setStatus(SegmentStatusEnum.NEW);
segment.setStorageLocationIdentifier(generateStorageLocation());
+ segment.setEnableSharding(true);
segment.setCubeInstance(cubeInstance);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index a8894a1..b606d2e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -25,9 +25,12 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -163,12 +166,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final AtomicInteger totalScannedCount = new AtomicInteger(0);
final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
logger.info("The execution of this query will use " + toggle + " as endpoint's behavior");
+ List<Future<?>> futures = Lists.newArrayList();
for (int i = 0; i < rawScans.size(); ++i) {
final int shardIndex = i;
final RawScan rawScan = rawScans.get(i);
- executorService.submit(new Runnable() {
+ Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
@@ -177,7 +181,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
for (IntList intList : hbaseColumnsToGTIntList) {
builder.addHbaseColumnsToGT(intList);
}
-
+ builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
builder.setBehavior(toggle);
Collection<CubeVisitProtos.CubeVisitResponse> results;
@@ -211,14 +215,19 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
rowBlocks.addAll(part);
}
});
+ futures.add(future);
}
executorService.shutdown();
try {
- if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
- throw new RuntimeException("Visiting cube by endpoint timeout");
+ for (Future<?> future : futures) {
+ future.get(1, TimeUnit.HOURS);
}
} catch (InterruptedException e) {
throw new RuntimeException("Visiting cube by endpoint gets interrupted");
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Visiting cube throw exception", e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Visiting cube by endpoint timeout");
}
return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 0ed374f..8bff4d1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -83,7 +83,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeDesc cubeDesc = null;
String segmentName = null;
KylinConfig kylinConfig;
- public static final boolean ENABLE_CUBOID_SHARDING = true;
@Override
public int run(String[] args) throws Exception {
@@ -267,7 +266,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
- if (ENABLE_CUBOID_SHARDING) {//&& (nRegion > 1)) {
+ if (cubeSegment.isEnableSharding()) {//&& (nRegion > 1)) {
//use prime nRegions to help random sharding
int original = nRegion;
nRegion = Primes.nextPrime(nRegion);//return 2 for input 1
@@ -289,7 +288,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
logger.info("Expecting " + nRegion + " regions.");
logger.info("Expecting " + mbPerRegion + " MB per region.");
- if (ENABLE_CUBOID_SHARDING) {
+ if (cubeSegment.isEnableSharding()) {
//each cuboid will be split into different number of shards
HashMap<Long, Short> cuboidShards = Maps.newHashMap();
double[] regionSizes = new double[nRegion];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 221be8a..cdc259b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.security.User;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.model.CubeDesc;
@@ -33,7 +34,7 @@ public class CubeHTableUtil {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
// https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+ tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 2d4fb90..896bc72 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -41,8 +41,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
*/
public class HBaseStreamingOutput implements IStreamingOutput {
@@ -53,10 +51,7 @@ public class HBaseStreamingOutput implements IStreamingOutput {
public ICuboidWriter getCuboidWriter(IBuildable buildable) {
try {
CubeSegment cubeSegment = (CubeSegment) buildable;
-
- //If ever attempt to enable sharding on streaming please also check //TODO:shardingonstreaming
- Preconditions.checkArgument(!cubeSegment.isEnableSharding(), "Streaming table not allowed to use sharding");
-
+
final HTableInterface hTable;
hTable = createHTable(cubeSegment);
return new HBaseCuboidWriter(cubeSegment, hTable);