You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/16 04:15:25 UTC

[1/2] incubator-kylin git commit: KYLIN-1135 pscan use cached thread pool

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 134960c62 -> 71e6bed33


KYLIN-1135 pscan use cached thread pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e8187bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e8187bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e8187bb

Branch: refs/heads/2.x-staging
Commit: 8e8187bbd06efcd8fa91aa30d74a2d245b683b2b
Parents: 134960c
Author: honma <ho...@ebay.com>
Authored: Mon Nov 16 11:11:40 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Nov 16 11:11:40 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java | 78 +++++++++++++++-----
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  4 +-
 2 files changed, 61 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 2beb2c6..d753a20 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -25,17 +25,22 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
-import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.TreeMultiset;
 
 /**
  * <p/>
@@ -118,28 +123,63 @@ public class BasicTest {
     @Test
     public void test0() throws Exception {
 
-        TreeMultiset<Long> xx = TreeMultiset.create();
-        xx.add(2L);
-        xx.add(1L);
-        xx.add(1L);
-        for (Long hi : xx) {
-            System.out.println(hi);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        List<Future<?>> futures = Lists.newArrayList();
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                throw new RuntimeException("hi");
+            }
+        }));
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                    System.out.println("finish 1");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }));
+
+        try {
+            for (Future<?> future : futures) {
+                future.get(1, TimeUnit.HOURS);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            System.out.println(e.getMessage());
+        }
+
+        futures.clear();
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(3000);
+                    System.out.println("finish 2");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }));
+
+        try {
+            for (Future<?> future : futures) {
+                future.get(1, TimeUnit.HOURS);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            System.out.println(e.getMessage());
         }
-        System.out.println(Long.MAX_VALUE);
-
-        IdentityHashMap<String, Void> a = new IdentityHashMap<>();
-        IdentityHashMap<String, Void> b = new IdentityHashMap<>();
-        String s1 = new String("s1");
-        String s2 = new String("s1");
-        Assert.assertEquals(s1, s2);
-        Assert.assertTrue(s1 != s2);
-        a.put(s1, null);
-        b.put(s2, null);
     }
 
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
+
         System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433833611000L));
         System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
         System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01 00:00:00"));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index b606d2e..5bc4a00 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -66,6 +66,8 @@ import com.google.protobuf.HBaseZeroCopyByteString;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
+    private static ExecutorService executorService = Executors.newCachedThreadPool();
+
     static class EndpointResultsAsGTScanner implements IGTScanner {
         private GTInfo info;
         private Iterator<byte[]> blocks;
@@ -155,7 +157,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
         logger.info("Serialized scanRequestBytes's size is " + scanRequestBytes.length);
 
-        ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
         final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
 
         logger.info("Total RawScan range count: " + rawScans.size());
@@ -217,7 +218,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
             futures.add(future);
         }
-        executorService.shutdown();
         try {
             for (Future<?> future : futures) {
                 future.get(1, TimeUnit.HOURS);


[2/2] incubator-kylin git commit: minor: fix bugs when merging codes

Posted by ma...@apache.org.
minor: fix bugs when merging codes


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/71e6bed3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/71e6bed3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/71e6bed3

Branch: refs/heads/2.x-staging
Commit: 71e6bed33bd5d71113ab609515060efe0b49a520
Parents: 8e8187b
Author: honma <ho...@ebay.com>
Authored: Mon Nov 16 11:19:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Nov 16 11:19:54 2015 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/cube/CubeSegment.java  |  2 ++
 .../org/apache/kylin/engine/mr/BatchCubingJobBuilder.java | 10 +++++-----
 .../org/apache/kylin/engine/mr/BatchMergeJobBuilder.java  |  6 ++++--
 3 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 076bd14..62df1e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -28,9 +28,11 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index b5a7272..12cac94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -44,13 +44,13 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         Preconditions.checkArgument(!newSegment.isEnableSharding(), "V1 job engine does not support building sharded cubes");
 
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment)seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment) seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to BUILD segment " + seg);
 
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
@@ -62,7 +62,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         result.addTask(createBuildDictionaryStep(jobId));
 
         // Phase 3: Build Cube
-        RowKeyDesc rowKeyDesc = ((CubeSegment)seg).getCubeDesc().getRowkey();
+        RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
         final int groupRowkeyColumnsCount = rowKeyDesc.getNCuboidBuildLevels();
         final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
         final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
@@ -88,7 +88,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
 
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
@@ -112,7 +112,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
         appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 1282e61..831aa9d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -42,13 +42,15 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
 
         Preconditions.checkArgument(!mergeSegment.isEnableSharding(), "V1 job engine does not support merging sharded cubes");
 
-        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide(mergeSegment);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to MERGE segment " + seg);
 
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final CubeSegment cubeSegment = (CubeSegment)seg;
+
+        final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);