You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:54:08 UTC
[14/31] kylin git commit: Revert "Revert "refactor
BuildCubeWithStream""
Revert "Revert "refactor BuildCubeWithStream""
This reverts commit 8e9c4550bb562b497442b17eec6485ae96e848d8.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/be18158d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/be18158d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/be18158d
Branch: refs/heads/master-cdh5.7
Commit: be18158dcc5ce739c272b9345d3b2296c3936ee3
Parents: 8cbffb4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:58:43 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 7 +-
.../kylin/provision/BuildCubeWithStream.java | 10 +-
.../kylin/provision/BuildCubeWithStream2.java | 145 +------------------
3 files changed, 12 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9b282e3..9e9df05 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -187,6 +187,7 @@ public class DeployUtil {
File tmpFile = File.createTempFile(factTableName, "csv");
FileOutputStream out = new FileOutputStream(tmpFile);
+ InputStream tempIn = null;
try {
if (store.exists(factTablePath)) {
InputStream oldContent = store.getResource(factTablePath).inputStream;
@@ -194,13 +195,15 @@ public class DeployUtil {
}
IOUtils.copy(in, out);
IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
store.deleteResource(factTablePath);
- in = new FileInputStream(tmpFile);
- store.putResource(factTablePath, in, System.currentTimeMillis());
+ tempIn = new FileInputStream(tmpFile);
+ store.putResource(factTablePath, tempIn, System.currentTimeMillis());
} finally {
IOUtils.closeQuietly(out);
IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(tempIn);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 6e5313f..bfe1d0a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,10 +62,10 @@ public class BuildCubeWithStream {
private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
- private CubeManager cubeManager;
+ protected CubeManager cubeManager;
private DefaultScheduler scheduler;
protected ExecutableManager jobService;
- private static final String cubeName = "test_streaming_table_cube";
+ static final String cubeName = "test_streaming_table_cube";
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
@@ -114,13 +114,13 @@ public class BuildCubeWithStream {
Assert.assertEquals(topicName, topicMetadata.topic());
}
- private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
+ protected void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
logger.info("Test data inserted into Kafka");
}
- private void clearSegment(String cubeName) throws Exception {
+ protected void clearSegment(String cubeName) throws Exception {
CubeInstance cube = cubeManager.getCube(cubeName);
// remove all existing segments
CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -187,7 +187,7 @@ public class BuildCubeWithStream {
return job.getId();
}
- private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
index 2812446..7959701 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -18,13 +18,11 @@
package org.apache.kylin.provision;
-import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,32 +30,9 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,79 +42,12 @@ import static java.lang.Thread.sleep;
/**
* for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently.
*/
-public class BuildCubeWithStream2 {
+public class BuildCubeWithStream2 extends BuildCubeWithStream {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
-
- private CubeManager cubeManager;
- private DefaultScheduler scheduler;
- protected ExecutableManager jobService;
- private static final String cubeName = "test_streaming_table_cube";
-
- private KafkaConfig kafkaConfig;
- private MockKafka kafkaServer;
private static boolean generateData = true;
- public void before() throws Exception {
- deployEnv();
-
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- jobService = ExecutableManager.getInstance(kylinConfig);
- scheduler = DefaultScheduler.createInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
- if (!scheduler.hasStarted()) {
- throw new RuntimeException("scheduler has not been started");
- }
- cubeManager = CubeManager.getInstance(kylinConfig);
-
- final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- final String factTable = cubeInstance.getFactTable();
-
- final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
- final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
- kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
-
- String topicName = UUID.randomUUID().toString();
- String localIp = NetworkUtils.getLocalIp();
- BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
- brokerConfig.setHost(localIp);
- kafkaConfig.setTopic(topicName);
- KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
-
- startEmbeddedKafka(topicName, brokerConfig);
- }
-
- private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
- //Start mock Kakfa
- String zkConnectionStr = "sandbox:2181";
- ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
- // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
- kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
- kafkaServer.start();
-
- kafkaServer.createTopic(topicName, 3, 1);
- kafkaServer.waitTopicUntilReady(topicName);
-
- MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
- Assert.assertEquals(topicName, topicMetadata.topic());
- }
-
- private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
- if (numberOfRecords <= 0)
- return;
- Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
- logger.info("Test data inserted into Kafka");
- }
-
- private void clearSegment(String cubeName) throws Exception {
- CubeInstance cube = cubeManager.getCube(cubeName);
- // remove all existing segments
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- cubeManager.updateCube(cubeBuilder);
- }
-
+ @Override
public void build() throws Exception {
clearSegment(cubeName);
SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
@@ -204,55 +112,6 @@ public class BuildCubeWithStream2 {
}
-
- private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
- DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
- jobService.addJob(job);
- waitForJob(job.getId());
- return job.getStatus();
- }
-
- protected void deployEnv() throws IOException {
- DeployUtil.overrideJobJarLocations();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- }
-
- public static void beforeClass() throws Exception {
- logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
- if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
- }
- HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
- }
-
- public static void afterClass() throws Exception {
- HBaseMetadataTestCase.staticCleanupTestMetadata();
- }
-
- public void after() {
- kafkaServer.stop();
- DefaultScheduler.destroyInstance();
- }
-
- protected void waitForJob(String jobId) {
- while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
- break;
- } else {
- try {
- sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
public static void main(String[] args) throws Exception {
try {
beforeClass();