You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/26 17:20:53 UTC
[07/32] incubator-kylin git commit: KYLIN-697 non-integration tests
all passed
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
deleted file mode 100644
index 6bceab7..0000000
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
-
-/**
- */
-public class BuildIIWithStreamTest {
-
- private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class);
-
- private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
- private IIManager iiManager;
- private KylinConfig kylinConfig;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
- DeployUtil.overrideJobJarLocations();
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- iiManager = IIManager.getInstance(kylinConfig);
- iiManager = IIManager.getInstance(kylinConfig);
- for (String iiInstance : II_NAME) {
-
- IIInstance ii = iiManager.getII(iiInstance);
- if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
- ii.setStatus(RealizationStatusEnum.DISABLED);
- iiManager.updateII(ii,true);
- }
- }
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- backup();
- }
-
- private static int cleanupOldStorage() throws Exception {
- String[] args = { "--delete", "true" };
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- return exitCode;
- }
-
- private static void backup() throws Exception {
- int exitCode = cleanupOldStorage();
- if (exitCode == 0) {
- exportHBaseData();
- }
- }
-
- private static void exportHBaseData() throws IOException {
- ExportHBaseData export = new ExportHBaseData();
- export.exportTables();
- }
-
- private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
- IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
- JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
- final String uuid = UUID.randomUUID().toString();
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
- String insertDataHqls;
- try {
- insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
- } catch (IOException e1) {
- e1.printStackTrace();
- throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
- }
-
- ShellExecutable step = new ShellExecutable();
- StringBuffer buf = new StringBuffer();
- buf.append("hive -e \"");
- buf.append(dropTableHql + "\n");
- buf.append(createTableHql + "\n");
- buf.append(insertDataHqls + "\n");
- buf.append("\"");
-
- step.setCmd(buf.toString());
- logger.info(step.getCmd());
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
- kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
- return intermediateTableDesc.getTableName(uuid);
- }
-
- private void clearSegment(String iiName) throws Exception {
- IIInstance ii = iiManager.getII(iiName);
- ii.getSegments().clear();
- iiManager.updateII(ii,true);
- }
-
- private IISegment createSegment(String iiName) throws Exception {
- clearSegment(iiName);
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- long date1 = 0;
- long date2 = f.parse("2015-01-01").getTime();
- return buildSegment(iiName, date1, date2);
- }
-
- private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
- IIInstance iiInstance = iiManager.getII(iiName);
- IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
- iiInstance.getSegments().add(segment);
- iiManager.updateII(iiInstance,true);
- return segment;
- }
-
- private void buildII(String iiName) throws Exception {
- final IIDesc desc = iiManager.getII(iiName).getDescriptor();
- final String tableName = createIntermediateTable(desc, kylinConfig);
- logger.info("intermediate table name:" + tableName);
- final Configuration conf = new Configuration();
- HCatInputFormat.setInput(conf, "default", tableName);
- final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
- logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
- HiveTableReader reader = new HiveTableReader("default", tableName);
- final List<TblColRef> tblColRefs = desc.listAllColumns();
- for (TblColRef tblColRef : tblColRefs) {
- if (desc.isMetricsCol(tblColRef)) {
- logger.info("matrix:" + tblColRef.getName());
- } else {
- logger.info("measure:" + tblColRef.getName());
- }
- }
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
- final IISegment segment = createSegment(iiName);
- String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
- ToolRunner.run(new IICreateHTableJob(), args);
-
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
-
- List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
- int count = sorted.size();
- for (String[] row : sorted) {
- logger.info("another row: " + StringUtils.join(row, ","));
- queue.put(parse(row));
- }
-
- reader.close();
- logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
- queue.put(StreamMessage.EOF);
- final Future<?> future = executorService.submit(streamBuilder);
- try {
- future.get();
- } catch (Exception e) {
- logger.error("stream build failed", e);
- fail("stream build failed");
- }
-
- logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
- }
-
- @Test
- public void test() throws Exception {
- for (String iiName : II_NAME) {
- buildII(iiName);
- IIInstance ii = iiManager.getII(iiName);
- if (ii.getStatus() != RealizationStatusEnum.READY) {
- ii.setStatus(RealizationStatusEnum.READY);
- iiManager.updateII(ii,true);
- }
- }
- }
-
- private StreamMessage parse(String[] row) {
- return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
- }
-
- private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
- List<String[]> unsorted = Lists.newArrayList();
- while (reader.next()) {
- unsorted.add(reader.getRow());
- }
- Collections.sort(unsorted, new Comparator<String[]>() {
- @Override
- public int compare(String[] o1, String[] o2) {
- long t1 = DateFormat.stringToMillis(o1[tsCol]);
- long t2 = DateFormat.stringToMillis(o2[tsCol]);
- return Long.compare(t1, t2);
- }
- });
- return unsorted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
deleted file mode 100644
index 6aaeaa2..0000000
--- a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- */
-public class IIStreamBuilderTest {
-
- private static final Logger logger = LoggerFactory.getLogger(IIStreamBuilderTest.class);
-
- private KylinConfig kylinConfig;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
-// backup();
- }
-
- private static void backup() throws Exception {
- int exitCode = cleanupOldStorage();
- if (exitCode == 0) {
- exportHBaseData();
- }
- }
-
- private static int cleanupOldStorage() throws Exception {
- String[] args = {"--delete", "true"};
-
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- return exitCode;
- }
-
- private static void exportHBaseData() throws IOException {
- ExportHBaseData export = new ExportHBaseData();
- export.exportTables();
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- }
-
- @Test
- public void test() throws Exception {
- final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
- bootstrap.start("eagle", 0);
- Thread.sleep(30 * 60 * 1000);
- logger.info("time is up, stop streaming");
- bootstrap.stop();
- Thread.sleep(5 * 1000);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
new file mode 100644
index 0000000..e99bd01
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.cube.CubingJob;
+import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITBuildCubeWithEngineTest {
+
+ private JobEngineConfig jobEngineConfig;
+
+ private CubeManager cubeManager;
+
+ private DefaultScheduler scheduler;
+
+ protected ExecutableManager jobService;
+
+ private static final Log logger = LogFactory.getLog(ITBuildCubeWithEngineTest.class);
+
+ protected void waitForJob(String jobId) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+ break;
+ } else {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+
+
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ jobService = ExecutableManager.getInstance(kylinConfig);
+ scheduler = DefaultScheduler.getInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig));
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+ cubeManager = CubeManager.getInstance(kylinConfig);
+ jobEngineConfig = new JobEngineConfig(kylinConfig);
+ for (String jobId : jobService.getAllJobIds()) {
+ if(jobService.getJob(jobId) instanceof CubingJob){
+ jobService.deleteJob(jobId);
+ }
+ }
+
+ }
+
+ @After
+ public void after() {
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+ DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
+ testInner();
+ testLeft();
+ }
+
+ private void testInner() throws Exception {
+ String[] testCase = new String[]{
+ "testInnerJoinCube",
+ "testInnerJoinCube2",
+ };
+ runTestAndAssertSucceed(testCase);
+ }
+
+ private void testLeft() throws Exception {
+ String[] testCase = new String[]{
+ "testLeftJoinCube",
+ "testLeftJoinCube2",
+ };
+ runTestAndAssertSucceed(testCase);
+ }
+
+ private void runTestAndAssertSucceed(String[] testCase) throws Exception {
+ ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+ final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+ List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+ for (int i = 0; i < testCase.length; i++) {
+ tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+ }
+ countDownLatch.await();
+ try {
+ for (int i = 0; i < tasks.size(); ++i) {
+ Future<List<String>> task = tasks.get(i);
+ final List<String> jobIds = task.get();
+ for (String jobId : jobIds) {
+ assertJobSucceed(jobId);
+ }
+ }
+ } catch (Exception ex) {
+ logger.error(ex);
+ throw ex;
+ }
+ }
+
+ private void assertJobSucceed(String jobId) {
+ assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+ }
+
+ private class TestCallable implements Callable<List<String>> {
+
+ private final String methodName;
+ private final CountDownLatch countDownLatch;
+
+ public TestCallable(String methodName, CountDownLatch countDownLatch) {
+ this.methodName = methodName;
+ this.countDownLatch = countDownLatch;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<String> call() throws Exception {
+ try {
+ final Method method = ITBuildCubeWithEngineTest.class.getDeclaredMethod(methodName);
+ method.setAccessible(true);
+ return (List<String>) method.invoke(ITBuildCubeWithEngineTest.this);
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+ }
+
+ @SuppressWarnings("unused") // called by reflection
+ private List<String> testInnerJoinCube2() throws Exception {
+ clearSegment("test_kylin_cube_with_slr_empty");
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long date1 = 0;
+ long date2 = f.parse("2013-01-01").getTime();
+ long date3 = f.parse("2022-01-01").getTime();
+ List<String> result = Lists.newArrayList();
+ result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
+ result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+ return result;
+ }
+
+ @SuppressWarnings("unused") // called by reflection
+ private List<String> testInnerJoinCube() throws Exception {
+ clearSegment("test_kylin_cube_without_slr_empty");
+
+
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ // this cube's start date is 0, end date is 20501112000000
+ long date1 = 0;
+ long date2 = f.parse("2013-01-01").getTime();
+
+
+ // this cube doesn't support incremental build, always do full build
+
+ List<String> result = Lists.newArrayList();
+ result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2));
+ return result;
+ }
+
+ @SuppressWarnings("unused") // called by reflection
+ private List<String> testLeftJoinCube2() throws Exception {
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ List<String> result = Lists.newArrayList();
+ final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
+ // this cube's start date is 0, end date is 20120601000000
+ long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+ long dateEnd = f.parse("2012-06-01").getTime();
+
+ clearSegment(cubeName);
+ result.add(buildSegment(cubeName, dateStart, dateEnd));
+
+ // then submit an append job, start date is 20120601000000, end
+ // date is 20220101000000
+ dateStart = f.parse("2012-06-01").getTime();
+ dateEnd = f.parse("2022-01-01").getTime();
+ result.add(buildSegment(cubeName, dateStart, dateEnd));
+ return result;
+
+ }
+
+ @SuppressWarnings("unused") // called by reflection
+ private List<String> testLeftJoinCube() throws Exception {
+ String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+ clearSegment(cubeName);
+
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+ long dateEnd = f.parse("2050-11-12").getTime();
+
+ // this cube's start date is 0, end date is 20501112000000
+ List<String> result = Lists.newArrayList();
+ result.add(buildSegment(cubeName, dateStart, dateEnd));
+ return result;
+
+ }
+
+ private void clearSegment(String cubeName) throws Exception {
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ cube.getSegments().clear();
+ cubeManager.updateCube(cube,true);
+ }
+
+
+ private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
+ CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
+ CubingJobBuilder cubingJobBuilder = new CubingJobBuilder(jobEngineConfig);
+ CubingJob job = cubingJobBuilder.buildJob(segment);
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getId();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
new file mode 100644
index 0000000..8a26397
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
@@ -0,0 +1,224 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
+import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ *
+ * This class is going to be deleted
+ */
+@Ignore("For dev testing")
+public class ITBuildCubeWithStreamTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITBuildCubeWithStreamTest.class);
+
+ private KylinConfig kylinConfig;
+ private CubeManager cubeManager;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ DeployUtil.overrideJobJarLocations();
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ cubeManager = CubeManager.getInstance(kylinConfig);
+
+ }
+
+ @After
+ public void after() {
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+ CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ final CubeDesc desc = cube.getDescriptor();
+ // cube.getSegments().clear();
+ // cubeManager.updateCube(cube);
+
+ CubeSegment cubeSegment = cube.getSegment("19700101000000_20150401000000", SegmentStatusEnum.NEW);
+ Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+//
+ for (DimensionDesc dim : desc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (desc.getRowkey().isUseDictionary(col)) {
+ Dictionary dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+ }
+ logger.info("Dictionary for " + col + " was put into dictionary map.");
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+
+// final String tableName = createIntermediateTable(desc, kylinConfig, null);
+ String tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a24dec89_efbd_425f_9a5f_8b78dd1412af"; // has 3089 records;
+// tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
+// tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
+// tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
+ tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150401000000_fb7ae579_d987_4900_a3b7_c60c731cd269"; // 2 million records
+ logger.info("intermediate table name:" + tableName);
+
+
+ ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cubeBuilder);
+
+ final Configuration conf = new Configuration();
+ HCatInputFormat.setInput(conf, "default", tableName);
+ final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
+ logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+ HiveTableReader reader = new HiveTableReader("default", tableName);
+ List<String> row;
+ int counter = 0;
+ while (reader.next()) {
+ row = reader.getRowAsList();
+ queue.put(row);
+ counter++;
+ if(counter == 200000)
+ break;
+ }
+ queue.put(new ArrayList<String>(0));
+ reader.close();
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ throw new IOException("Failed to build cube ", e);
+ }
+
+ logger.info("stream build finished");
+ }
+
+
+ private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
+
+ List<TblColRef> dimColumns = desc.listDimensionColumnsExcludingDerived();
+ for (List<String> row : table) {
+ for (int i = 0; i < dimColumns.size(); i++) {
+ String cell = row.get(i);
+ valueMap.put(dimColumns.get(i), cell);
+ }
+ }
+
+ for (DimensionDesc dim : desc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (desc.getRowkey().isUseDictionary(col)) {
+ Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), Collections2.transform(valueMap.get(col), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ if (input == null)
+ return null;
+ return input.getBytes();
+ }
+ }));
+
+ logger.info("Building dictionary for " + col);
+ dictionaryMap.put(col, dict);
+ }
+ }
+ }
+
+ }
+
+
+ class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(Long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
new file mode 100644
index 0000000..23f97d0
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.invertedindex.IIJob;
+import org.apache.kylin.job.invertedindex.IIJobBuilder;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author shaoshi
+ */
+public class ITBuildIIWithEngineTest {
+
+ private JobEngineConfig jobEngineConfig;
+ private IIManager iiManager;
+
+ private DefaultScheduler scheduler;
+ protected ExecutableManager jobService;
+
+ protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
+
+ private static final Log logger = LogFactory.getLog(ITBuildIIWithEngineTest.class);
+
+ protected void waitForJob(String jobId) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+ break;
+ } else {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ //DeployUtil.initCliWorkDir();
+ // DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ jobService = ExecutableManager.getInstance(kylinConfig);
+ scheduler = DefaultScheduler.getInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig));
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+ jobEngineConfig = new JobEngineConfig(kylinConfig);
+ for (String jobId : jobService.getAllJobIds()) {
+ if (jobService.getJob(jobId) instanceof IIJob) {
+ jobService.deleteJob(jobId);
+ }
+ }
+
+ iiManager = IIManager.getInstance(kylinConfig);
+ for (String iiInstance : TEST_II_INSTANCES) {
+
+ IIInstance ii = iiManager.getII(iiInstance);
+ if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+ ii.setStatus(RealizationStatusEnum.DISABLED);
+ iiManager.updateII(ii, true);
+ }
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+
+ for (String iiInstance : TEST_II_INSTANCES) {
+ IIInstance ii = iiManager.getII(iiInstance);
+ if (ii.getStatus() != RealizationStatusEnum.READY) {
+ ii.setStatus(RealizationStatusEnum.READY);
+ iiManager.updateII(ii, true);
+ }
+ }
+ backup();
+ }
+
+ @Test
+ @Ignore
+ public void testBuildII() throws Exception {
+
+ String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
+ ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+ final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+ List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+ for (int i = 0; i < testCase.length; i++) {
+ tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+ }
+ countDownLatch.await();
+ for (int i = 0; i < tasks.size(); ++i) {
+ Future<List<String>> task = tasks.get(i);
+ final List<String> jobIds = task.get();
+ for (String jobId : jobIds) {
+ assertJobSucceed(jobId);
+ }
+ }
+
+ }
+
+ private void assertJobSucceed(String jobId) {
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+ }
+
+ private class TestCallable implements Callable<List<String>> {
+
+ private final String methodName;
+ private final CountDownLatch countDownLatch;
+
+ public TestCallable(String methodName, CountDownLatch countDownLatch) {
+ this.methodName = methodName;
+ this.countDownLatch = countDownLatch;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<String> call() throws Exception {
+ try {
+ final Method method = ITBuildIIWithEngineTest.class.getDeclaredMethod(methodName);
+ method.setAccessible(true);
+ return (List<String>) method.invoke(ITBuildIIWithEngineTest.this);
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+ }
+
+ protected List<String> buildIIInnerJoin() throws Exception {
+ return buildII(TEST_II_INSTANCES[0]);
+ }
+
+ protected List<String> buildIILeftJoin() throws Exception {
+ return buildII(TEST_II_INSTANCES[1]);
+ }
+
+ protected List<String> buildII(String iiName) throws Exception {
+ clearSegment(iiName);
+
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ long date1 = 0;
+ long date2 = f.parse("2015-01-01").getTime();
+
+ List<String> result = Lists.newArrayList();
+ result.add(buildSegment(iiName, date1, date2));
+ return result;
+ }
+
+ private void clearSegment(String iiName) throws Exception {
+ IIInstance ii = iiManager.getII(iiName);
+ ii.getSegments().clear();
+ iiManager.updateII(ii,true);
+ }
+
+ private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
+ IIInstance iiInstance = iiManager.getII(iiName);
+ IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+ iiInstance.getSegments().add(segment);
+ iiManager.updateII(iiInstance, true);
+ IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
+ IIJob job = iiJobBuilder.buildJob(segment);
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getId();
+ }
+
+ private int cleanupOldStorage() throws Exception {
+ String[] args = { "--delete", "true" };
+
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ return exitCode;
+ }
+
+ private void backup() throws Exception {
+ int exitCode = cleanupOldStorage();
+ if (exitCode == 0) {
+ exportHBaseData();
+ }
+ }
+
+ private void exportHBaseData() throws IOException {
+ ExportHBaseData export = new ExportHBaseData();
+ export.exportTables();
+ }
+
+ public static void main(String[] args) throws Exception {
+ ITBuildIIWithEngineTest instance = new ITBuildIIWithEngineTest();
+
+ ITBuildIIWithEngineTest.beforeClass();
+ instance.before();
+ instance.testBuildII();
+ instance.after();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
new file mode 100644
index 0000000..cfb7bdb
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
@@ -0,0 +1,270 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.junit.Assert.fail;
+
+/**
+ */
+public class ITBuildIIWithStreamTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITBuildIIWithStreamTest.class);
+
+ private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+ private IIManager iiManager;
+ private KylinConfig kylinConfig;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ DeployUtil.overrideJobJarLocations();
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ iiManager = IIManager.getInstance(kylinConfig);
+ iiManager = IIManager.getInstance(kylinConfig);
+ for (String iiInstance : II_NAME) {
+
+ IIInstance ii = iiManager.getII(iiInstance);
+ if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+ ii.setStatus(RealizationStatusEnum.DISABLED);
+ iiManager.updateII(ii,true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ backup();
+ }
+
+ private static int cleanupOldStorage() throws Exception {
+ String[] args = { "--delete", "true" };
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ return exitCode;
+ }
+
+ private static void backup() throws Exception {
+ int exitCode = cleanupOldStorage();
+ if (exitCode == 0) {
+ exportHBaseData();
+ }
+ }
+
+ private static void exportHBaseData() throws IOException {
+ ExportHBaseData export = new ExportHBaseData();
+ export.exportTables();
+ }
+
+ private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
+ IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
+ JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+ final String uuid = UUID.randomUUID().toString();
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+ String insertDataHqls;
+ try {
+ insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
+ }
+
+ ShellExecutable step = new ShellExecutable();
+ StringBuffer buf = new StringBuffer();
+ buf.append("hive -e \"");
+ buf.append(dropTableHql + "\n");
+ buf.append(createTableHql + "\n");
+ buf.append(insertDataHqls + "\n");
+ buf.append("\"");
+
+ step.setCmd(buf.toString());
+ logger.info(step.getCmd());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+ kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+ return intermediateTableDesc.getTableName(uuid);
+ }
+
+ private void clearSegment(String iiName) throws Exception {
+ IIInstance ii = iiManager.getII(iiName);
+ ii.getSegments().clear();
+ iiManager.updateII(ii,true);
+ }
+
+ private IISegment createSegment(String iiName) throws Exception {
+ clearSegment(iiName);
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ long date1 = 0;
+ long date2 = f.parse("2015-01-01").getTime();
+ return buildSegment(iiName, date1, date2);
+ }
+
+ private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
+ IIInstance iiInstance = iiManager.getII(iiName);
+ IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+ iiInstance.getSegments().add(segment);
+ iiManager.updateII(iiInstance,true);
+ return segment;
+ }
+
+ private void buildII(String iiName) throws Exception {
+ final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+ final String tableName = createIntermediateTable(desc, kylinConfig);
+ logger.info("intermediate table name:" + tableName);
+ final Configuration conf = new Configuration();
+ HCatInputFormat.setInput(conf, "default", tableName);
+ final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
+ logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+ HiveTableReader reader = new HiveTableReader("default", tableName);
+ final List<TblColRef> tblColRefs = desc.listAllColumns();
+ for (TblColRef tblColRef : tblColRefs) {
+ if (desc.isMetricsCol(tblColRef)) {
+ logger.info("matrix:" + tblColRef.getName());
+ } else {
+ logger.info("measure:" + tblColRef.getName());
+ }
+ }
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
+ final IISegment segment = createSegment(iiName);
+ String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
+ ToolRunner.run(new IICreateHTableJob(), args);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
+
+ List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
+ int count = sorted.size();
+ for (String[] row : sorted) {
+ logger.info("another row: " + StringUtils.join(row, ","));
+ queue.put(parse(row));
+ }
+
+ reader.close();
+ logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
+ queue.put(StreamMessage.EOF);
+ final Future<?> future = executorService.submit(streamBuilder);
+ try {
+ future.get();
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ fail("stream build failed");
+ }
+
+ logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
+ }
+
+ @Test
+ public void test() throws Exception {
+ for (String iiName : II_NAME) {
+ buildII(iiName);
+ IIInstance ii = iiManager.getII(iiName);
+ if (ii.getStatus() != RealizationStatusEnum.READY) {
+ ii.setStatus(RealizationStatusEnum.READY);
+ iiManager.updateII(ii,true);
+ }
+ }
+ }
+
+ private StreamMessage parse(String[] row) {
+ return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+ }
+
+ private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
+ List<String[]> unsorted = Lists.newArrayList();
+ while (reader.next()) {
+ unsorted.add(reader.getRow());
+ }
+ Collections.sort(unsorted, new Comparator<String[]>() {
+ @Override
+ public int compare(String[] o1, String[] o2) {
+ long t1 = DateFormat.stringToMillis(o1[tsCol]);
+ long t2 = DateFormat.stringToMillis(o2[tsCol]);
+ return Long.compare(t1, t2);
+ }
+ });
+ return unsorted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/ITIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITIIStreamBuilderTest.java
new file mode 100644
index 0000000..2edf457
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITIIStreamBuilderTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ */
+public class ITIIStreamBuilderTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITIIStreamBuilderTest.class);
+
+ private KylinConfig kylinConfig;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+// backup();
+ }
+
+ private static void backup() throws Exception {
+ int exitCode = cleanupOldStorage();
+ if (exitCode == 0) {
+ exportHBaseData();
+ }
+ }
+
+ private static int cleanupOldStorage() throws Exception {
+ String[] args = {"--delete", "true"};
+
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ return exitCode;
+ }
+
+ private static void exportHBaseData() throws IOException {
+ ExportHBaseData export = new ExportHBaseData();
+ export.exportTables();
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+ }
+
+ @Test
+ public void test() throws Exception {
+ final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
+ bootstrap.start("eagle", 0);
+ Thread.sleep(30 * 60 * 1000);
+ logger.info("time is up, stop streaming");
+ bootstrap.stop();
+ Thread.sleep(5 * 1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index 22afb1d..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.*;
-
-import org.apache.kylin.job.SelfStopExecutable;
-import org.junit.Test;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
- @Test
- public void testSingleTaskJob() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- }
-
- @Test
- public void testSucceed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- }
- @Test
- public void testSucceedAndFailed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new FailedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- }
- @Test
- public void testSucceedAndError() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new ErrorTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testDiscard() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SelfStopExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
- jobService.discardJob(job.getId());
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
- Thread.sleep(5000);
- System.out.println(job);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
new file mode 100644
index 0000000..bf4a1dc
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import org.apache.kylin.job.*;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class ITDefaultSchedulerTest extends BaseSchedulerTest {
+
+ @Test
+ public void testSingleTaskJob() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ }
+
+ @Test
+ public void testSucceed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+ }
+ @Test
+ public void testSucceedAndFailed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new FailedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+ }
+ @Test
+ public void testSucceedAndError() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new ErrorTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testDiscard() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SelfStopExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+ jobService.discardJob(job.getId());
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+ Thread.sleep(5000);
+ System.out.println(job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/metadata/src/test/java/org/apache/kylin/metadata/tool/HiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/HiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/HiveSourceTableLoaderTest.java
deleted file mode 100644
index 0534f0f..0000000
--- a/metadata/src/test/java/org/apache/kylin/metadata/tool/HiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tool;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- super.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- super.cleanupTestMetadata();
- }
-
- @Test
- public void test() throws IOException {
- if (!useSandbox())
- return;
-
- KylinConfig config = getTestConfig();
- String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
- assertTrue(loaded.size() == toLoad.length);
- for (String str : toLoad)
- assertTrue(loaded.contains(str));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..f357855
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.tool;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.metadata.util.HiveSourceTableLoader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+ @Before
+ public void setup() throws Exception {
+ super.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.cleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws IOException {
+ if (!useSandbox())
+ return;
+
+ KylinConfig config = getTestConfig();
+ String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+ Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+ assertTrue(loaded.size() == toLoad.length);
+ for (String str : toLoad)
+ assertTrue(loaded.contains(str));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2e9b4f9..a72b59d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -562,19 +562,13 @@
<configuration>
<reportsDirectory>${project.basedir}/../target/surefire-reports</reportsDirectory>
<excludes>
+ <exclude>**/IT*.java</exclude>
<!--Build cube/II need to be run separately-->
- <exclude>**/BuildCubeWithEngineTest.java</exclude>
- <exclude>**/BuildIIWithEngineTest.java</exclude>
- <exclude>**/BuildIIWithStreamTest.java</exclude>
- <exclude>**/IIStreamBuilderTest.java</exclude>
<!--minicluster does not have hive-->
- <exclude>**/SnapshotManagerTest.java</exclude>
- <exclude>**/HiveTableReaderTest.java</exclude>
- <exclude>**/TableControllerTest.java</exclude>
-
- <exclude>**/Kafka*Test.java</exclude>
- <exclude>**/RequesterTest.java</exclude>
+ <exclude>**/ITSnapshotManagerTest.java</exclude>
+ <exclude>**/ITHiveTableReaderTest.java</exclude>
+ <exclude>**/ITTableControllerTest.java</exclude>
</excludes>
<systemProperties>
<property>
@@ -608,15 +602,7 @@
<configuration>
<reportsDirectory>${project.basedir}/../target/surefire-reports</reportsDirectory>
<excludes>
- <!--Build cube/II need to be run separately-->
- <exclude>**/BuildCubeWithEngineTest.java</exclude>
- <exclude>**/BuildIIWithEngineTest.java</exclude>
- <exclude>**/BuildIIWithStreamTest.java</exclude>
- <exclude>**/IIStreamBuilderTest.java</exclude>
-
- <exclude>**/Kafka*Test.java</exclude>
- <exclude>**/RequesterTest.java</exclude>
- <exclude>**/InMemCubeBuilderBenchmarkTest.java</exclude>
+ <exclude>**/IT*.java</exclude>
</excludes>
<systemProperties>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/query/src/test/java/org/apache/kylin/query/test/CombinationTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/CombinationTest.java b/query/src/test/java/org/apache/kylin/query/test/CombinationTest.java
deleted file mode 100644
index 9e8e92b..0000000
--- a/query/src/test/java/org/apache/kylin/query/test/CombinationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.test;
-
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- */
-@RunWith(Parameterized.class)
-public class CombinationTest extends KylinQueryTest {
-
- @BeforeClass
- public static void setUp() throws SQLException {
- System.out.println("setUp in CombinationTest");
- }
-
- @AfterClass
- public static void tearDown() {
- clean();
- }
-
- /**
- * return all config combinations, where first setting specifies join type
- * (inner or left), and the second setting specifies whether to force using
- * coprocessors(on, off or unset).
- */
- @Parameterized.Parameters
- public static Collection<Object[]> configs() {
-// return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
- return Arrays.asList(new Object[][]{{"inner", "on"}, {"left", "on"}});
- }
-
- public CombinationTest(String joinType, String coprocessorToggle) throws Exception {
-
- KylinQueryTest.clean();
-
- KylinQueryTest.joinType = joinType;
- KylinQueryTest.setupAll();
-
- if (coprocessorToggle.equals("on")) {
- ObserverEnabler.forceCoprocessorOn();
- } else if (coprocessorToggle.equals("off")) {
- ObserverEnabler.forceCoprocessorOff();
- } else if (coprocessorToggle.equals("unset")) {
- // unset
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
deleted file mode 100644
index 185b22e..0000000
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.test;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- */
-@RunWith(Parameterized.class)
-public class IIQueryTest extends KylinQueryTest {
- @BeforeClass
- public static void setUp() throws Exception {
-
- // give II higher priority than other realizations
- Map<RealizationType, Integer> priorities = Maps.newHashMap();
- priorities.put(RealizationType.INVERTED_INDEX, 0);
- priorities.put(RealizationType.CUBE, 1);
- priorities.put(RealizationType.HYBRID, 1);
- RealizationPriorityRule.setPriorities(priorities);
-
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- KylinQueryTest.tearDown();//invoke super class
-
- Map<RealizationType, Integer> priorities = Maps.newHashMap();
- priorities.put(RealizationType.INVERTED_INDEX, 1);
- priorities.put(RealizationType.CUBE, 0);
- priorities.put(RealizationType.HYBRID, 0);
- RealizationPriorityRule.setPriorities(priorities);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] { { "inner" }, { "left" } });
- }
-
- public IIQueryTest(String joinType) throws Exception {
-
- KylinQueryTest.clean();
-
- KylinQueryTest.joinType = joinType;
- KylinQueryTest.setupAll();
-
- }
-
- @Test
- public void testSingleRunQuery() throws Exception {
- super.testSingleRunQuery();
- }
-
- @Test
- public void testDetailedQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_ii", null, true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b6e6e2e/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
new file mode 100644
index 0000000..a38e85c
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.test;
+
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class ITCombinationTest extends ITKylinQueryTest {
+
+ @BeforeClass
+ public static void setUp() throws SQLException {
+ System.out.println("setUp in ITCombinationTest");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ clean();
+ }
+
+ /**
+ * return all config combinations, where first setting specifies join type
+ * (inner or left), and the second setting specifies whether to force using
+ * coprocessors(on, off or unset).
+ */
+ @Parameterized.Parameters
+ public static Collection<Object[]> configs() {
+// return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
+ return Arrays.asList(new Object[][]{{"inner", "on"}, {"left", "on"}});
+ }
+
+ public ITCombinationTest(String joinType, String coprocessorToggle) throws Exception {
+
+ ITKylinQueryTest.clean();
+
+ ITKylinQueryTest.joinType = joinType;
+ ITKylinQueryTest.setupAll();
+
+ if (coprocessorToggle.equals("on")) {
+ ObserverEnabler.forceCoprocessorOn();
+ } else if (coprocessorToggle.equals("off")) {
+ ObserverEnabler.forceCoprocessorOff();
+ } else if (coprocessorToggle.equals("unset")) {
+ // unset
+ }
+ }
+}