You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/16 04:15:25 UTC
[1/2] incubator-kylin git commit: KYLIN-1135 pscan use cached thread
pool
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 134960c62 -> 71e6bed33
KYLIN-1135 pscan use cached thread pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e8187bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e8187bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e8187bb
Branch: refs/heads/2.x-staging
Commit: 8e8187bbd06efcd8fa91aa30d74a2d245b683b2b
Parents: 134960c
Author: honma <ho...@ebay.com>
Authored: Mon Nov 16 11:11:40 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Nov 16 11:11:40 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 78 +++++++++++++++-----
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 4 +-
2 files changed, 61 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 2beb2c6..d753a20 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -25,17 +25,22 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
-import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.configuration.ConfigurationException;
-import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.TreeMultiset;
/**
* <p/>
@@ -118,28 +123,63 @@ public class BasicTest {
@Test
public void test0() throws Exception {
- TreeMultiset<Long> xx = TreeMultiset.create();
- xx.add(2L);
- xx.add(1L);
- xx.add(1L);
- for (Long hi : xx) {
- System.out.println(hi);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ List<Future<?>> futures = Lists.newArrayList();
+
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ throw new RuntimeException("hi");
+ }
+ }));
+
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ System.out.println("finish 1");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }));
+
+ try {
+ for (Future<?> future : futures) {
+ future.get(1, TimeUnit.HOURS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ System.out.println(e.getMessage());
+ }
+
+ futures.clear();
+
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(3000);
+ System.out.println("finish 2");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }));
+
+ try {
+ for (Future<?> future : futures) {
+ future.get(1, TimeUnit.HOURS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ System.out.println(e.getMessage());
}
- System.out.println(Long.MAX_VALUE);
-
- IdentityHashMap<String, Void> a = new IdentityHashMap<>();
- IdentityHashMap<String, Void> b = new IdentityHashMap<>();
- String s1 = new String("s1");
- String s2 = new String("s1");
- Assert.assertEquals(s1, s2);
- Assert.assertTrue(s1 != s2);
- a.put(s1, null);
- b.put(s2, null);
}
@Test
@Ignore("convenient trial tool for dev")
public void test1() throws Exception {
+
System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433833611000L));
System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01 00:00:00"));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index b606d2e..5bc4a00 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -66,6 +66,8 @@ import com.google.protobuf.HBaseZeroCopyByteString;
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
+ private static ExecutorService executorService = Executors.newCachedThreadPool();
+
static class EndpointResultsAsGTScanner implements IGTScanner {
private GTInfo info;
private Iterator<byte[]> blocks;
@@ -155,7 +157,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
logger.info("Serialized scanRequestBytes's size is " + scanRequestBytes.length);
- ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
logger.info("Total RawScan range count: " + rawScans.size());
@@ -217,7 +218,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
});
futures.add(future);
}
- executorService.shutdown();
try {
for (Future<?> future : futures) {
future.get(1, TimeUnit.HOURS);
[2/2] incubator-kylin git commit: minor: fix bugs when merging codes
Posted by ma...@apache.org.
minor: fix bugs when merging codes
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/71e6bed3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/71e6bed3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/71e6bed3
Branch: refs/heads/2.x-staging
Commit: 71e6bed33bd5d71113ab609515060efe0b49a520
Parents: 8e8187b
Author: honma <ho...@ebay.com>
Authored: Mon Nov 16 11:19:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Nov 16 11:19:54 2015 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/cube/CubeSegment.java | 2 ++
.../org/apache/kylin/engine/mr/BatchCubingJobBuilder.java | 10 +++++-----
.../org/apache/kylin/engine/mr/BatchMergeJobBuilder.java | 6 ++++--
3 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 076bd14..62df1e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -28,9 +28,11 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index b5a7272..12cac94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -44,13 +44,13 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
Preconditions.checkArgument(!newSegment.isEnableSharding(), "V1 job engine does not support building sharded cubes");
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment)seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment) seg);
}
public CubingJob build() {
logger.info("MR_V1 new job to BUILD segment " + seg);
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
@@ -62,7 +62,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
result.addTask(createBuildDictionaryStep(jobId));
// Phase 3: Build Cube
- RowKeyDesc rowKeyDesc = ((CubeSegment)seg).getCubeDesc().getRowkey();
+ RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
final int groupRowkeyColumnsCount = rowKeyDesc.getNCuboidBuildLevels();
final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
@@ -88,7 +88,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
@@ -112,7 +112,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/71e6bed3/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 1282e61..831aa9d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -42,13 +42,15 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
Preconditions.checkArgument(!mergeSegment.isEnableSharding(), "V1 job engine does not support merging sharded cubes");
- this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide(mergeSegment);
}
public CubingJob build() {
logger.info("MR_V1 new job to MERGE segment " + seg);
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final CubeSegment cubeSegment = (CubeSegment)seg;
+
+ final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);