You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/13 05:39:53 UTC
[4/8] kylin git commit: refactor
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index d50baad..0000000
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
- @Test
- public void testSingleTaskJob() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- }
-
- @Test
- public void testSucceed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndFailed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new FailedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndError() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new ErrorTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testDiscard() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SelfStopExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
- jobService.discardJob(job.getId());
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
- Thread.sleep(5000);
- System.out.println(job);
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testSchedulerPool() throws InterruptedException {
- ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
- final CountDownLatch countDownLatch = new CountDownLatch(3);
- ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch.countDown();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
- assertTrue("future should still running", future.cancel(true));
-
- final CountDownLatch countDownLatch2 = new CountDownLatch(3);
- ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch2.countDown();
- throw new RuntimeException();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
- assertFalse("future2 should has been stopped", future2.cancel(true));
-
- final CountDownLatch countDownLatch3 = new CountDownLatch(3);
- ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- countDownLatch3.countDown();
- throw new RuntimeException();
- } catch (Exception e) {
- }
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
- assertTrue("future3 should still running", future3.cancel(true));
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
index 0ae7e6a..18da37a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
@@ -18,7 +18,6 @@
package org.apache.kylin.metadata.filter.UDF;
-import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.metadata.filter.function.Functions;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
new file mode 100644
index 0000000..3bb4540
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
@@ -0,0 +1,189 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ */
+public class StorageMockUtils {
+ public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
+ TupleInfo info = new TupleInfo();
+ int idx = 0;
+
+ for (TblColRef col : groups) {
+ info.setField(col.getName(), col, idx++);
+ }
+
+ TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
+ for (FunctionDesc func : aggregations) {
+ TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
+ info.setField(col.getName(), col, idx++);
+ }
+
+ return info;
+ }
+
+ public static List<TblColRef> buildGroups() {
+ List<TblColRef> groups = new ArrayList<TblColRef>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+ TblColRef cf1 = new TblColRef(c1);
+ groups.add(cf1);
+
+ TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
+ ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
+ TblColRef cf2 = new TblColRef(c2);
+ groups.add(cf2);
+
+ return groups;
+ }
+
+ public static List<FunctionDesc> buildAggregations1() {
+ List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("SUM");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue("PRICE");
+ p1.setColRefs(ImmutableList.of(priceCol));
+ f1.setParameter(p1);
+ f1.setReturnType("decimal(19,4)");
+ functions.add(f1);
+
+
+ return functions;
+ }
+
+ public static List<FunctionDesc> buildAggregations() {
+ List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+ TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
+
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("SUM");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue("PRICE");
+ p1.setColRefs(ImmutableList.of(priceCol));
+ f1.setParameter(p1);
+ f1.setReturnType("decimal(19,4)");
+ functions.add(f1);
+
+ FunctionDesc f2 = new FunctionDesc();
+ f2.setExpression("COUNT_DISTINCT");
+ ParameterDesc p2 = new ParameterDesc();
+ p2.setType("column");
+ p2.setValue("SELLER_ID");
+ p2.setColRefs(ImmutableList.of(sellerCol));
+ f2.setParameter(p2);
+ f2.setReturnType("hllc(10)");
+ functions.add(f2);
+
+ return functions;
+ }
+
+ public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter1(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter2(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+ ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter2);
+ ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
+ compareFilter.addChild(constantFilter2);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter3(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+
+ public static TupleFilter buildAndFilter(List<TblColRef> columns) {
+ CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+ CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+ LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ andFilter.addChild(compareFilter1);
+ andFilter.addChild(compareFilter2);
+ return andFilter;
+ }
+
+ public static TupleFilter buildOrFilter(List<TblColRef> columns) {
+ CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+ CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+ LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+ logicFilter.addChild(compareFilter1);
+ logicFilter.addChild(compareFilter2);
+ return logicFilter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
deleted file mode 100644
index 5f8f08f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.cache;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- */
-public class StorageMockUtils {
- public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
- TupleInfo info = new TupleInfo();
- int idx = 0;
-
- for (TblColRef col : groups) {
- info.setField(col.getName(), col, idx++);
- }
-
- TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
- for (FunctionDesc func : aggregations) {
- TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
- info.setField(col.getName(), col, idx++);
- }
-
- return info;
- }
-
- public static List<TblColRef> buildGroups() {
- List<TblColRef> groups = new ArrayList<TblColRef>();
-
- TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
- ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
- TblColRef cf1 = new TblColRef(c1);
- groups.add(cf1);
-
- TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
- ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
- TblColRef cf2 = new TblColRef(c2);
- groups.add(cf2);
-
- return groups;
- }
-
- public static List<FunctionDesc> buildAggregations() {
- List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
-
- TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
- TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
- TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
-
- FunctionDesc f1 = new FunctionDesc();
- f1.setExpression("SUM");
- ParameterDesc p1 = new ParameterDesc();
- p1.setType("column");
- p1.setValue("PRICE");
- p1.setColRefs(ImmutableList.of(priceCol));
- f1.setParameter(p1);
- f1.setReturnType("decimal(19,4)");
- functions.add(f1);
-
- FunctionDesc f2 = new FunctionDesc();
- f2.setExpression("COUNT_DISTINCT");
- ParameterDesc p2 = new ParameterDesc();
- p2.setType("column");
- p2.setValue("SELLER_ID");
- p2.setColRefs(ImmutableList.of(sellerCol));
- f2.setParameter(p2);
- f2.setReturnType("hllc(10)");
- functions.add(f2);
-
- return functions;
- }
-
- public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildFilter1(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildFilter2(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
- ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter2);
- ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
- compareFilter.addChild(constantFilter2);
- return compareFilter;
- }
-
- public static TupleFilter buildAndFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
- CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
- LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
- andFilter.addChild(compareFilter1);
- andFilter.addChild(compareFilter2);
- return andFilter;
- }
-
- public static TupleFilter buildOrFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
- CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
- LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
- logicFilter.addChild(compareFilter1);
- logicFilter.addChild(compareFilter2);
- return logicFilter;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 60815c7..1a3efb7 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -124,6 +124,13 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-job</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-storage-hbase</artifactId>
<type>test-jar</type>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 4291d91..809cfb7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -63,8 +63,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ flatTable = LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
}
@AfterClass
@@ -84,7 +84,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
{
Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
future.get();
}
}
@@ -101,7 +101,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
@Override
public void close() {
-
+
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
new file mode 100644
index 0000000..ab9ac63
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(ITDoggedCubeBuilderTest.class);
+
+ private static final int INPUT_ROWS = 10000;
+ private static final int SPLIT_ROWS = 5000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ long randSeed = System.currentTimeMillis();
+
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ doggedBuilder.setConcurrentThreads(THREADS);
+ doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+ FileRecordWriter doggedResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ doggedResult.close();
+ }
+
+ InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ inmemBuilder.setConcurrentThreads(THREADS);
+ FileRecordWriter inmemResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ inmemResult.close();
+ }
+
+ fileCompare(doggedResult.file, inmemResult.file);
+ doggedResult.file.delete();
+ inmemResult.file.delete();
+ }
+
+ private void fileCompare(File file, File file2) throws IOException {
+ BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+ BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+ String line1, line2;
+ do {
+ line1 = r1.readLine();
+ line2 = r2.readLine();
+
+ assertEquals(line1, line2);
+
+ } while (line1 != null || line2 != null);
+
+ r1.close();
+ r2.close();
+ }
+
+ class FileRecordWriter implements ICuboidWriter {
+
+ File file;
+ PrintWriter writer;
+
+ FileRecordWriter() throws IOException {
+ file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+ writer = new PrintWriter(file, "UTF-8");
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ writer.print(cuboidId);
+ writer.print(", ");
+ writer.print(record.toString());
+ writer.println();
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
new file mode 100644
index 0000000..ad02f2a
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITInMemCubeBuilderTest.class);
+
+ private CubeInstance cube;
+ private String flatTable;
+ private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+ private int nInpRows;
+ private int nThreads;
+
+ @Before
+ public void before() throws IOException {
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testKylinCube() throws Exception {
+ testBuild("test_kylin_cube_without_slr_left_join_empty", //
+ LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
+ }
+
+ @Test
+ public void testSSBCube() throws Exception {
+ testBuild("ssb", //
+ LOCALMETA_TEST_DATA + "/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
+ }
+
+ public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ this.nInpRows = nInpRows;
+ this.nThreads = nThreads;
+
+ this.cube = cubeManager.getCube(cubeName);
+ this.flatTable = flatTable;
+ this.dictionaryMap = getDictionaryMap(cube, flatTable);
+
+ testBuildInner();
+ }
+
+ private void testBuildInner() throws Exception {
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ cubeBuilder.setConcurrentThreads(nThreads);
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ try {
+ // round 1
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, nInpRows);
+ future.get();
+ }
+
+ // round 2, zero input
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, 0);
+ future.get();
+ }
+
+ // round 3
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, nInpRows);
+ future.get();
+ }
+
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ throw new IOException("Failed to build cube ", e);
+ }
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+ feedData(cube, flatTable, queue, count, 0);
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ @SuppressWarnings("unchecked")
+ Set<String>[] distinctSets = new Set[nColumns];
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i] = new TreeSet<String>();
+
+ // get distinct values on each column
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ assert row.length == nColumns;
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i].add(row[i]);
+ }
+
+ List<String[]> distincts = new ArrayList<String[]>();
+ for (int i = 0; i < nColumns; i++) {
+ distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+ }
+
+ Random rand = new Random();
+ if (randSeed != 0)
+ rand.setSeed(randSeed);
+
+ // output with random data
+ for (; count > 0; count--) {
+ ArrayList<String> row = new ArrayList<String>(nColumns);
+ for (int i = 0; i < nColumns; i++) {
+ String[] candidates = distincts.get(i);
+ row.add(candidates[rand.nextInt(candidates.length)]);
+ }
+ queue.put(row);
+ }
+ queue.put(new ArrayList<String>(0));
+ }
+
+ static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ CubeDesc desc = cube.getDescriptor();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+ for (int c = 0; c < columns.size(); c++) {
+ TblColRef col = columns.get(c);
+ if (desc.getRowkey().isUseDictionary(col)) {
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+ result.put(col, dict);
+ }
+ }
+
+ for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
+ MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
+ FunctionDesc func = measureDesc.getFunction();
+ List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
+ if (dictCols.isEmpty())
+ continue;
+
+ int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+ List<TblColRef> paramCols = func.getParameter().getColRefs();
+ for (int i = 0; i < paramCols.size(); i++) {
+ TblColRef col = paramCols.get(i);
+ if (dictCols.contains(col)) {
+ int colIdxOnFlat = flatTableIdx[i];
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+
+ result.put(col, dict);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+ List<byte[]> result = Lists.newArrayList();
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ if (row.length != nColumns) {
+ throw new IllegalStateException();
+ }
+ if (row[c] != null) {
+ result.add(Bytes.toBytes(row[c]));
+ }
+ }
+ return result;
+ }
+
+ class ConsoleGTRecordWriter implements ICuboidWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+
+ @Override
+ public void flush() {
+ if (verbose) {
+ System.out.println("flush");
+ }
+ }
+
+ @Override
+ public void close() {
+ if (verbose) {
+ System.out.println("close");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
new file mode 100644
index 0000000..ad1ddd3
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class ITDefaultSchedulerTest extends BaseSchedulerTest {
+
+ @Test
+ public void testSingleTaskJob() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ }
+
+ @Test
+ public void testSucceed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndFailed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new FailedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndError() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new ErrorTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testDiscard() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SelfStopExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+ jobService.discardJob(job.getId());
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+ Thread.sleep(5000);
+ System.out.println(job);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testSchedulerPool() throws InterruptedException {
+ ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(3);
+ ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch.countDown();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+ assertTrue("future should still running", future.cancel(true));
+
+ final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+ ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch2.countDown();
+ throw new RuntimeException();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+ assertFalse("future2 should has been stopped", future2.cancel(true));
+
+ final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+ ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ countDownLatch3.countDown();
+ throw new RuntimeException();
+ } catch (Exception e) {
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+ assertTrue("future3 should still running", future3.cancel(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index c945485..e1cbe1f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -99,12 +99,12 @@ public class BuildCubeWithEngine {
logger.info("Will not use fast build mode");
}
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
try {
//check hdfs permission
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
index 5ab5e83..d862dbf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -80,7 +80,7 @@ public class BuildCubeWithSpark {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
@@ -89,7 +89,7 @@ public class BuildCubeWithSpark {
@Before
public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
DeployUtil.initCliWorkDir();
DeployUtil.deployMetadata();
@@ -117,7 +117,7 @@ public class BuildCubeWithSpark {
@Test
public void test() throws Exception {
final CubeSegment segment = createSegment();
- String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+ String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
logger.info("confPath location:" + confPath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index eeff999..99da26f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -67,11 +67,11 @@ public class BuildCubeWithStream {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
public void before() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
index 4b8ce24..643b122 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -92,7 +92,7 @@ public class BuildIIWithEngine {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
@@ -100,7 +100,7 @@ public class BuildIIWithEngine {
@Before
public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
//DeployUtil.initCliWorkDir();
// DeployUtil.deployMetadata();
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index 9b7cd14..ace1a2f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -109,7 +109,7 @@ public class BuildIIWithStream {
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
public void before() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index ec3e60f..46aa68d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -19,11 +19,9 @@
package org.apache.kylin.query;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
@@ -38,22 +36,18 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.LogManager;
-import com.google.common.base.Throwables;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionContext;
import org.apache.kylin.common.KylinConfig;
import org.dbunit.Assertion;
import org.dbunit.database.DatabaseConfig;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
import org.dbunit.dataset.DataSetException;
-import org.dbunit.dataset.DefaultTable;
import org.dbunit.dataset.ITable;
import org.dbunit.dataset.SortedTable;
import org.dbunit.dataset.datatype.DataType;
@@ -63,8 +57,6 @@ import org.dbunit.ext.h2.H2DataTypeFactory;
import org.junit.Assert;
import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*/
@@ -287,7 +279,6 @@ public class KylinTestBase {
return ret;
}
-
protected void execQueryUsingH2(String queryFolder, boolean needSort) throws Exception {
printInfo("---------- Running H2 queries: " + queryFolder);
@@ -349,7 +340,6 @@ public class KylinTestBase {
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
-
try {
// compare the result
Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount());
@@ -427,7 +417,6 @@ public class KylinTestBase {
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, parameters, needSort);
-
// compare the result
Assertion.assertEquals(h2Table, kylinTable);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 15e435e..136342d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageFactory;
-import org.apache.kylin.storage.cache.StorageMockUtils;
+import org.apache.kylin.storage.StorageMockUtils;
import org.apache.kylin.storage.exception.ScanOutOfLimitException;
import org.junit.After;
import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index eb36eed..796b9c1 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -44,6 +44,13 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
index 70c11b3..83c50c0 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
@@ -24,18 +24,16 @@ import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * Created by dongli on 2/22/16.
- */
public class HiveCmdBuilderTest {
@Before
public void setup() {
- System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta");
+ System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
}
@After
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6bbb0b7..1d3da28 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
@@ -56,6 +57,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -250,20 +252,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
@Override
- public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
+ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+
logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId());
short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId());
int totalShards = cubeSeg.getTotalShards();
- final List<ByteString> scanRequestByteStrings = Lists.newArrayList();
- final List<ByteString> rawScanByteStrings = Lists.newArrayList();
+ ByteString scanRequestByteString = null;
+ ByteString rawScanByteString = null;
// primary key (also the 0th column block) is always selected
- final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0);
+ final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
@@ -274,65 +277,89 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
}
- for (GTScanRequest req : scanRequests) {
- ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- GTScanRequest.serializer.serialize(req, buffer);
- buffer.flip();
- scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()));
+ //TODO: raw scan can be constructed at region side to reduce traffic
+ List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
+ int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+ BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+ for (RawScan rs : rawScans) {
+ RawScan.serializer.serialize(rs, rawScanBuffer);
+ }
+ rawScanBuffer.flip();
+ rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+ rawScanBufferSize *= 4;
+ }
+ }
+ scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
- RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks);
+ int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+ GTScanRequest.serializer.serialize(scanRequest, buffer);
+ buffer.flip();
+ scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+ scanRequestBufferSize *= 4;
+ }
+ }
- ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- RawScan.serializer.serialize(rawScan, rawScanBuffer);
- rawScanBuffer.flip();
- rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()));
+ logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
- logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position());
- logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg);
- logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+ logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
+ for (RawScan rs : rawScans) {
+ logScan(rs, cubeSeg.getStorageLocationIdentifier());
}
- logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() });
+ logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[] { shardNum, cuboidBaseShard, rawScans.size() });
final AtomicInteger totalScannedCount = new AtomicInteger(0);
- final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum);
+ final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+ final String currentThreadName = Thread.currentThread().getName();
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
+ final ByteString finalScanRequestByteString = scanRequestByteString;
+ final ByteString finalRawScanByteString = rawScanByteString;
executorService.submit(new Runnable() {
@Override
public void run() {
- for (int i = 0; i < scanRequests.size(); ++i) {
- CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
- builder.setGtScanRequest(scanRequestByteStrings.get(i)).setHbaseRawScan(rawScanByteStrings.get(i));
- for (IntList intList : hbaseColumnsToGTIntList) {
- builder.addHbaseColumnsToGT(intList);
- }
- builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
- builder.setBehavior(toggle);
+ CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+ builder.setGtScanRequest(finalScanRequestByteString).setHbaseRawScan(finalRawScanByteString);
+ for (IntList intList : hbaseColumnsToGTIntList) {
+ builder.addHbaseColumnsToGT(intList);
+ }
+ builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
+ builder.setBehavior(toggle);
+
+ Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+ try {
+ results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
+ } catch (Throwable throwable) {
+ throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+ }
- Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+ for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
+ totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
+ logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
try {
- results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
- } catch (Throwable throwable) {
- throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable);
- }
-
- for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
- totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
- try {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException("Error when decompressing", e);
- }
+ epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException("Error when decompressing", e);
}
}
}
});
}
- return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get());
+ return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
}
private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 8563a5e..49e8593 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -18,7 +18,6 @@
package org.apache.kylin.storage.hbase.cube.v2;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -43,16 +42,15 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.IGTStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public abstract class CubeHBaseRPC {
+public abstract class CubeHBaseRPC implements IGTStorage {
public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class);
@@ -71,8 +69,6 @@ public abstract class CubeHBaseRPC {
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
}
- abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException;
-
public static Scan buildScan(RawScan rawScan) {
Scan scan = new Scan();
scan.setCaching(rawScan.hbaseCaching);
@@ -96,7 +92,7 @@ public abstract class CubeHBaseRPC {
return scan;
}
- protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+ private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
@@ -123,46 +119,12 @@ public abstract class CubeHBaseRPC {
return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
}
- protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
- final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
- List<RawScan> ret = Lists.newArrayList();
-
- LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
- byte[] start = encoder.createBuf();
- byte[] end = encoder.createBuf();
- List<byte[]> startKeys;
- List<byte[]> endKeys;
-
- encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
- encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
- startKeys = encoder.getRowKeysDifferentShards(start);
-
- encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
- encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
- endKeys = encoder.getRowKeysDifferentShards(end);
- endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
- @Override
- public byte[] apply(byte[] input) {
- byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning
- System.arraycopy(input, 0, shardEnd, 0, input.length);
- return shardEnd;
- }
- });
-
- Preconditions.checkState(startKeys.size() == endKeys.size());
- List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
-
- KylinConfig config = cubeSeg.getCubeDesc().getConfig();
- int hbaseCaching = config.getHBaseScanCacheRows();
- int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
- if (isMemoryHungry(selectedColBlocks))
- hbaseCaching /= 10;
-
- for (short i = 0; i < startKeys.size(); ++i) {
- ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
+ protected List<RawScan> preparedHBaseScans(List<GTScanRange> ranges, ImmutableBitSet selectedColBlocks) {
+ List<RawScan> allRawScans = Lists.newArrayList();
+ for (GTScanRange range : ranges) {
+ allRawScans.add(preparedHBaseScan(range.pkStart, range.pkEnd, range.fuzzyKeys, selectedColBlocks));
}
- return ret;
-
+ return allRawScans;
}
private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {