You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/07/07 09:13:01 UTC
[1/2] kylin git commit: KYLIN-1854 Allow deleting cube instance when
its underlying cubedesc went wrong
Repository: kylin
Updated Branches:
refs/heads/master 4cd733a14 -> db5f89bae
KYLIN-1854 Allow deleting cube instance when its underlying cubedesc went wrong
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/edfb37d0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/edfb37d0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/edfb37d0
Branch: refs/heads/master
Commit: edfb37d08c94294801307c396b94df103b8706c8
Parents: 4cd733a
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Jul 7 15:31:56 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Jul 7 15:31:56 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 6 ++--
.../java/org/apache/kylin/cube/CubeManager.java | 15 ++++++---
.../apache/kylin/rest/service/CubeService.java | 32 ++++++++++++++------
3 files changed, 35 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/edfb37d0/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 22fe48a..da97df3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -32,9 +32,9 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.job.dataGen.FactTableGenerator;
import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -71,9 +71,7 @@ public class DeployUtil {
// update cube desc signature.
for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
- cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- CubeManager.getInstance(config()).updateCube(cubeBuilder);
+ CubeDescManager.getInstance(config()).updateCubeDesc(cube.getDescriptor());//enforce signature updating
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/edfb37d0/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a200f5d..0941d56 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -810,13 +810,20 @@ public class CubeManager implements IRealizationProvider {
ResourceStore store = getStore();
List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");
- logger.debug("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT));
+ logger.info("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT));
+ int succeed = 0;
+ int fail = 0;
for (String path : paths) {
- reloadCubeLocalAt(path);
+ CubeInstance cube = reloadCubeLocalAt(path);
+ if (cube == null) {
+ fail++;
+ } else {
+ succeed++;
+ }
}
- logger.debug("Loaded " + paths.size() + " Cube(s)");
+ logger.info("Loaded " + succeed + " cubes, fail on " + fail + " cubes");
}
private synchronized CubeInstance reloadCubeLocalAt(String path) {
@@ -855,7 +862,7 @@ public class CubeManager implements IRealizationProvider {
return cubeInstance;
} catch (Exception e) {
- logger.error("Error during load cube instance " + path, e);
+ logger.error("Error during load cube instance, skipping : " + path, e);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/edfb37d0/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index a9d4bfc..72942e8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -282,8 +282,15 @@ public class CubeService extends BasicService {
throw new JobException("The cube " + cube.getName() + " has running job, please discard it and try again.");
}
- this.releaseAllSegments(cube);
- getCubeManager().dropCube(cube.getName(), true);
+ try {
+ this.releaseAllJobs(cube);
+ } catch (Exception e) {
+ logger.error("error when releasing all jobs", e);
+ //ignore the exception
+ }
+
+ int cubeNum = getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size();
+ getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete cube desc when no other cube is using it
accessService.clean(cube, true);
}
@@ -550,13 +557,7 @@ public class CubeService extends BasicService {
return CubeManager.getInstance(getConfig()).updateCube(update);
}
- /**
- * purge the cube
- *
- * @throws IOException
- * @throws JobException
- */
- private CubeInstance releaseAllSegments(CubeInstance cube) throws IOException, JobException {
+ private void releaseAllJobs(CubeInstance cube) {
final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null);
for (CubingJob cubingJob : cubingJobs) {
final ExecutableState status = cubingJob.getStatus();
@@ -564,9 +565,20 @@ public class CubeService extends BasicService {
getExecutableManager().discardJob(cubingJob.getId());
}
}
+ }
+
+ /**
+ * purge the cube
+ *
+ * @throws IOException
+ * @throws JobException
+ */
+ private void releaseAllSegments(CubeInstance cube) throws IOException, JobException {
+ releaseAllJobs(cube);
+
CubeUpdate update = new CubeUpdate(cube);
update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- return CubeManager.getInstance(getConfig()).updateCube(update);
+ CubeManager.getInstance(getConfig()).updateCube(update);
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
[2/2] kylin git commit: minor, remove build ii step
Posted by ma...@apache.org.
minor, remove build ii step
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/db5f89ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/db5f89ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/db5f89ba
Branch: refs/heads/master
Commit: db5f89bae53c9fa84c96c5c4693a142a136ef1fe
Parents: edfb37d
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Jul 7 17:06:40 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Jul 7 17:06:40 2016 +0800
----------------------------------------------------------------------
kylin-it/pom.xml | 38 +--
.../kylin/provision/BuildCubeWithStream.java | 14 +-
.../kylin/provision/BuildIIWithEngine.java | 253 ----------------
.../kylin/provision/BuildIIWithStream.java | 300 -------------------
4 files changed, 23 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index eeb74fe..271bae8 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -18,7 +18,8 @@
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kylin</artifactId>
<groupId>org.apache.kylin</groupId>
@@ -30,8 +31,8 @@
<name>Kylin:IT</name>
<properties>
- <hdp.version />
- <fastBuildMode />
+ <hdp.version/>
+ <fastBuildMode/>
</properties>
<!-- Dependencies. -->
@@ -283,9 +284,13 @@
<executions>
<execution>
<id>integration-tests</id>
- <phase>integration-test</phase>
<goals>
<goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verify</id>
+ <goals>
<goal>verify</goal>
</goals>
</execution>
@@ -323,7 +328,7 @@
<argument>-Dhdp.version=${hdp.version}</argument>
<argument>-DfastBuildMode=${fastBuildMode}</argument>
<argument>-classpath</argument>
- <classpath />
+ <classpath/>
<argument>org.apache.kylin.provision.BuildCubeWithEngine</argument>
</arguments>
<workingDirectory>${project.basedir}</workingDirectory>
@@ -344,33 +349,12 @@
<argument>-Dhdp.version=${hdp.version}</argument>
<argument>-DfastBuildMode=${fastBuildMode}</argument>
<argument>-classpath</argument>
- <classpath />
+ <classpath/>
<argument>org.apache.kylin.provision.BuildCubeWithStream</argument>
</arguments>
<workingDirectory>${project.basedir}</workingDirectory>
</configuration>
</execution>
- <execution>
- <id>build_ii_with_stream</id>
- <goals>
- <goal>exec</goal>
- </goals>
- <phase>pre-integration-test</phase>
- <configuration>
- <skip>${skipTests}</skip>
- <classpathScope>test</classpathScope>
- <executable>java</executable>
- <arguments>
- <argument>-DuseSandbox=true</argument>
- <argument>-Dhdp.version=${hdp.version}</argument>
- <argument>-DfastBuildMode=${fastBuildMode}</argument>
- <argument>-classpath</argument>
- <classpath />
- <argument>org.apache.kylin.provision.BuildIIWithStream</argument>
- </arguments>
- <workingDirectory>${project.basedir}</workingDirectory>
- </configuration>
- </execution>
</executions>
</plugin>
http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c426ea4..1655a17 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
@@ -36,6 +37,7 @@ import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +63,7 @@ public class BuildCubeWithStream {
buildCubeWithStream.before();
buildCubeWithStream.build();
logger.info("Build is done");
- afterClass();
+ buildCubeWithStream.cleanup();
logger.info("Going to exit");
System.exit(0);
} catch (Exception e) {
@@ -101,10 +103,18 @@ public class BuildCubeWithStream {
DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
}
- public static void afterClass() throws Exception {
+ public void cleanup() throws Exception {
+ cleanupOldStorage();
HBaseMetadataTestCase.staticCleanupTestMetadata();
}
+ private static int cleanupOldStorage() throws Exception {
+ String[] args = { "--delete", "true" };
+
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ return exitCode;
+ }
+
public void build() throws Exception {
logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
for (long start = startTime; start < endTime; start += batchInterval) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
deleted file mode 100644
index c947d9d..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.provision;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder;
-import org.apache.kylin.engine.mr.invertedindex.IIJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine
-@Ignore
-public class BuildIIWithEngine {
-
- private JobEngineConfig jobEngineConfig;
- private IIManager iiManager;
-
- private DefaultScheduler scheduler;
- protected ExecutableManager jobService;
-
- protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
-
- private static final Log logger = LogFactory.getLog(BuildIIWithEngine.class);
-
- protected void waitForJob(String jobId) {
- while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
- break;
- } else {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
- if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
- }
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-
- //DeployUtil.initCliWorkDir();
- // DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
-
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- jobService = ExecutableManager.getInstance(kylinConfig);
- scheduler = DefaultScheduler.createInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
- if (!scheduler.hasStarted()) {
- throw new RuntimeException("scheduler has not been started");
- }
- jobEngineConfig = new JobEngineConfig(kylinConfig);
- for (String jobId : jobService.getAllJobIds()) {
- if (jobService.getJob(jobId) instanceof IIJob) {
- jobService.deleteJob(jobId);
- }
- }
-
- iiManager = IIManager.getInstance(kylinConfig);
- for (String iiInstance : TEST_II_INSTANCES) {
-
- IIInstance ii = iiManager.getII(iiInstance);
- if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
- ii.setStatus(RealizationStatusEnum.DISABLED);
- iiManager.updateII(ii);
- }
- }
- }
-
- @After
- public void after() throws Exception {
-
- for (String iiInstance : TEST_II_INSTANCES) {
- IIInstance ii = iiManager.getII(iiInstance);
- if (ii.getStatus() != RealizationStatusEnum.READY) {
- ii.setStatus(RealizationStatusEnum.READY);
- iiManager.updateII(ii);
- }
- }
- }
-
- @Test
- public void testBuildII() throws Exception {
-
- String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
- ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
- final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
- List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
- for (int i = 0; i < testCase.length; i++) {
- tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
- }
- countDownLatch.await();
- for (int i = 0; i < tasks.size(); ++i) {
- Future<List<String>> task = tasks.get(i);
- final List<String> jobIds = task.get();
- for (String jobId : jobIds) {
- assertJobSucceed(jobId);
- }
- }
-
- }
-
- private void assertJobSucceed(String jobId) {
- if (jobService.getOutput(jobId).getState() != ExecutableState.SUCCEED) {
- throw new RuntimeException("The job '" + jobId + "' is failed.");
- }
- }
-
- private class TestCallable implements Callable<List<String>> {
-
- private final String methodName;
- private final CountDownLatch countDownLatch;
-
- public TestCallable(String methodName, CountDownLatch countDownLatch) {
- this.methodName = methodName;
- this.countDownLatch = countDownLatch;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<String> call() throws Exception {
- try {
- final Method method = BuildIIWithEngine.class.getDeclaredMethod(methodName);
- method.setAccessible(true);
- return (List<String>) method.invoke(BuildIIWithEngine.this);
- } finally {
- countDownLatch.countDown();
- }
- }
- }
-
- protected List<String> buildIIInnerJoin() throws Exception {
- return buildII(TEST_II_INSTANCES[0]);
- }
-
- protected List<String> buildIILeftJoin() throws Exception {
- return buildII(TEST_II_INSTANCES[1]);
- }
-
- protected List<String> buildII(String iiName) throws Exception {
- clearSegment(iiName);
-
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- long date1 = 0;
- long date2 = f.parse("2015-01-01").getTime();
-
- List<String> result = Lists.newArrayList();
- result.add(buildSegment(iiName, date1, date2));
- return result;
- }
-
- private void clearSegment(String iiName) throws Exception {
- IIInstance ii = iiManager.getII(iiName);
- ii.getSegments().clear();
- iiManager.updateII(ii);
- }
-
- private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
- IIInstance iiInstance = iiManager.getII(iiName);
- IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
- iiInstance.getSegments().add(segment);
- iiManager.updateII(iiInstance);
-
- BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, "SYSTEM");
- IIJob job = batchIIJobBuilder.build();
- jobService.addJob(job);
- waitForJob(job.getId());
- return job.getId();
- }
-
- private int cleanupOldStorage() throws Exception {
- String[] args = { "--delete", "true" };
-
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- return exitCode;
- }
-
- public static void main(String[] args) throws Exception {
- BuildIIWithEngine instance = new BuildIIWithEngine();
-
- BuildIIWithEngine.beforeClass();
- instance.before();
- instance.testBuildII();
- instance.after();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
deleted file mode 100644
index 1a335c6..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.provision;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.SliceBuilder;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.source.hive.HiveCmdBuilder;
-import org.apache.kylin.source.hive.HiveTableReader;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class BuildIIWithStream {
-
- private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class);
-
- private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
- private IIManager iiManager;
- private KylinConfig kylinConfig;
-
- public static void main(String[] args) throws Exception {
- try {
- beforeClass();
- BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
- buildCubeWithEngine.before();
- buildCubeWithEngine.build();
- logger.info("Build is done");
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
- } catch (Exception e) {
- logger.error("error", e);
- System.exit(1);
- }
- }
-
- public static void beforeClass() throws Exception {
- logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
- }
- HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
- }
-
- protected void deployEnv() throws Exception {
- DeployUtil.overrideJobJarLocations();
- }
-
- public void before() throws Exception {
- deployEnv();
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- iiManager = IIManager.getInstance(kylinConfig);
- for (String iiInstance : II_NAME) {
-
- IIInstance ii = iiManager.getII(iiInstance);
- if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
- ii.setStatus(RealizationStatusEnum.DISABLED);
- iiManager.updateII(ii);
- }
- }
- }
-
- public static void afterClass() throws Exception {
- cleanupOldStorage();
- HBaseMetadataTestCase.staticCleanupTestMetadata();
- }
-
- private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
- IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
- JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
- final String uuid = UUID.randomUUID().toString();
- final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
- String insertDataHqls;
- insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
-
- ShellExecutable step = new ShellExecutable();
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement(useDatabaseHql);
- hiveCmdBuilder.addStatement(dropTableHql);
- hiveCmdBuilder.addStatement(createTableHql);
- hiveCmdBuilder.addStatement(insertDataHqls);
-
- step.setCmd(hiveCmdBuilder.build());
- logger.info(step.getCmd());
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
- kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
- return intermediateTableDesc.getTableName();
- }
-
- private void clearSegment(String iiName) throws Exception {
- IIInstance ii = iiManager.getII(iiName);
- ii.getSegments().clear();
- iiManager.updateII(ii);
- }
-
- private IISegment createSegment(String iiName) throws Exception {
- clearSegment(iiName);
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- long date1 = 0;
- long date2 = f.parse("2015-01-01").getTime();
- return buildSegment(iiName, date1, date2);
- }
-
- private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
- IIInstance iiInstance = iiManager.getII(iiName);
- IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
- iiInstance.getSegments().add(segment);
- iiManager.updateII(iiInstance);
- return segment;
- }
-
- private void buildII(String iiName) throws Exception {
- final IIDesc desc = iiManager.getII(iiName).getDescriptor();
- final String tableName = createIntermediateTable(desc, kylinConfig);
- logger.info("intermediate table name:" + tableName);
-
- HiveTableReader reader = new HiveTableReader("default", tableName);
- final List<TblColRef> tblColRefs = desc.listAllColumns();
- for (TblColRef tblColRef : tblColRefs) {
- if (desc.isMetricsCol(tblColRef)) {
- logger.info("matrix:" + tblColRef.getName());
- } else {
- logger.info("measure:" + tblColRef.getName());
- }
- }
- final IISegment segment = createSegment(iiName);
- final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
- String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
- ToolRunner.run(new IICreateHTableJob(), args);
-
- final IIDesc iiDesc = segment.getIIDesc();
- final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
-
- List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
- int count = sorted.size();
- ArrayList<StreamingMessage> messages = Lists.newArrayList();
- for (String[] row : sorted) {
- messages.add((parse(row)));
- if (messages.size() >= iiDesc.getSliceSize()) {
- build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
- messages.clear();
- }
- }
-
- if (!messages.isEmpty()) {
- build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
- }
-
- reader.close();
- logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
- logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
- }
-
- public void build() throws Exception {
- for (String iiName : II_NAME) {
- buildII(iiName);
- IIInstance ii = iiManager.getII(iiName);
- if (ii.getStatus() != RealizationStatusEnum.READY) {
- ii.setStatus(RealizationStatusEnum.READY);
- iiManager.updateII(ii);
- }
- }
- }
-
- private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException {
- final Slice slice = sliceBuilder.buildSlice(batch);
- try {
- loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
- List<Put> data = Lists.newArrayList();
- for (IIRow row : codec.encodeKeyValue(slice)) {
- final byte[] key = row.getKey().get();
- final byte[] value = row.getValue().get();
- Put put = new Put(key);
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
- final ImmutableBytesWritable dictionary = row.getDictionary();
- final byte[] dictBytes = dictionary.get();
- if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
- put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
- } else {
- throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
- }
- data.add(put);
- }
- hTable.put(data);
- //omit hTable.flushCommits(), because htable is auto flush
- }
-
- private StreamingMessage parse(String[] row) {
- return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap());
- }
-
- private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
- List<String[]> unsorted = Lists.newArrayList();
- while (reader.next()) {
- unsorted.add(reader.getRow());
- }
- Collections.sort(unsorted, new Comparator<String[]>() {
- @Override
- public int compare(String[] o1, String[] o2) {
- long t1 = DateFormat.stringToMillis(o1[tsCol]);
- long t2 = DateFormat.stringToMillis(o2[tsCol]);
- return Long.compare(t1, t2);
- }
- });
- return unsorted;
- }
-
- private static int cleanupOldStorage() throws Exception {
- String[] args = { "--delete", "true" };
-
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- return exitCode;
- }
-
-}