You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/05 09:57:23 UTC

incubator-kylin git commit: create a cubesegment with enableshard set true

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1126 ad3622e12 -> b6f6311bc


create a cubesegment with enableshard set true


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b6f6311b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b6f6311b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b6f6311b

Branch: refs/heads/KYLIN-1126
Commit: b6f6311bc2901382ce65cca0eaa3802f45a3598d
Parents: ad3622e
Author: honma <ho...@ebay.com>
Authored: Thu Nov 5 17:01:35 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Nov 5 17:01:35 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java    |  1 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 17 +++++++++++++----
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  5 ++---
 .../kylin/storage/hbase/steps/CubeHTableUtil.java  |  3 ++-
 .../storage/hbase/steps/HBaseStreamingOutput.java  |  7 +------
 5 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2232f01..3ead061 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -568,6 +568,7 @@ public class CubeManager implements IRealizationProvider {
         segment.setDateRangeEnd(endDate);
         segment.setStatus(SegmentStatusEnum.NEW);
         segment.setStorageLocationIdentifier(generateStorageLocation());
+        segment.setEnableSharding(true);
 
         segment.setCubeInstance(cubeInstance);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index a8894a1..b606d2e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -25,9 +25,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
@@ -163,12 +166,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
         final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
         logger.info("The execution of this query will use " + toggle + " as endpoint's behavior");
+        List<Future<?>> futures = Lists.newArrayList();
 
         for (int i = 0; i < rawScans.size(); ++i) {
             final int shardIndex = i;
             final RawScan rawScan = rawScans.get(i);
 
-            executorService.submit(new Runnable() {
+            Future<?> future = executorService.submit(new Runnable() {
                 @Override
                 public void run() {
                     final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
@@ -177,7 +181,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     for (IntList intList : hbaseColumnsToGTIntList) {
                         builder.addHbaseColumnsToGT(intList);
                     }
-
+                    builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
                     builder.setBehavior(toggle);
 
                     Collection<CubeVisitProtos.CubeVisitResponse> results;
@@ -211,14 +215,19 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     rowBlocks.addAll(part);
                 }
             });
+            futures.add(future);
         }
         executorService.shutdown();
         try {
-            if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
-                throw new RuntimeException("Visiting cube by endpoint timeout");
+            for (Future<?> future : futures) {
+                future.get(1, TimeUnit.HOURS);
             }
         } catch (InterruptedException e) {
             throw new RuntimeException("Visiting cube by endpoint gets interrupted");
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Visiting cube throw exception", e);
+        } catch (TimeoutException e) {
+            throw new RuntimeException("Visiting cube by endpoint timeout");
         }
 
         return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 0ed374f..8bff4d1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -83,7 +83,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeDesc cubeDesc = null;
     String segmentName = null;
     KylinConfig kylinConfig;
-    public static final boolean ENABLE_CUBOID_SHARDING = true;
 
     @Override
     public int run(String[] args) throws Exception {
@@ -267,7 +266,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
         nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
 
-        if (ENABLE_CUBOID_SHARDING) {//&& (nRegion > 1)) {
+        if (cubeSegment.isEnableSharding()) {//&& (nRegion > 1)) {
             //use prime nRegions to help random sharding
             int original = nRegion;
             nRegion = Primes.nextPrime(nRegion);//return 2 for input 1
@@ -289,7 +288,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         logger.info("Expecting " + nRegion + " regions.");
         logger.info("Expecting " + mbPerRegion + " MB per region.");
 
-        if (ENABLE_CUBOID_SHARDING) {
+        if (cubeSegment.isEnableSharding()) {
             //each cuboid will be split into different number of shards
             HashMap<Long, Short> cuboidShards = Maps.newHashMap();
             double[] regionSizes = new double[nRegion];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 221be8a..cdc259b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -33,7 +34,7 @@ public class CubeHTableUtil {
 
         HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
         // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
-        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
         tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
         tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6f6311b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 2d4fb90..896bc72 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -41,8 +41,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  */
 public class HBaseStreamingOutput implements IStreamingOutput {
@@ -53,10 +51,7 @@ public class HBaseStreamingOutput implements IStreamingOutput {
     public ICuboidWriter getCuboidWriter(IBuildable buildable) {
         try {
             CubeSegment cubeSegment = (CubeSegment) buildable;
-            
-            //If ever attempt to enable sharding on streaming please also check //TODO:shardingonstreaming
-            Preconditions.checkArgument(!cubeSegment.isEnableSharding(), "Streaming table not allowed to use sharding");
-            
+
             final HTableInterface hTable;
             hTable = createHTable(cubeSegment);
             return new HBaseCuboidWriter(cubeSegment, hTable);