You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/25 13:42:06 UTC
[10/13] kylin git commit: Revert "Revert "KYLIN-1726 add test case
BuildCubeWithStream2""
Revert "Revert "KYLIN-1726 add test case BuildCubeWithStream2""
This reverts commit 96d5f0e0e639fe4e4fc169f687004d9d9361999b.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/28525174
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/28525174
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/28525174
Branch: refs/heads/KYLIN-1726-2
Commit: 28525174a0a7a4ac5d26e57e7bd9218dfcf76dcf
Parents: 5af2390
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:58:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 24 14:58:15 2016 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/Kafka10DataLoader.java | 4 -
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/cube/CubeManager.java | 28 +-
.../org/apache/kylin/job/dao/ExecutableDao.java | 1 +
.../kylin/job/manager/ExecutableManager.java | 2 +-
.../streaming/cube/StreamingCubeBuilder.java | 2 +-
.../test_streaming_table_cube_desc.json | 3 +-
.../kylin/provision/BuildCubeWithStream.java | 32 ++-
.../kylin/provision/BuildCubeWithStream2.java | 274 +++++++++++++++++++
.../kylin/rest/controller/CubeController.java | 8 +-
.../apache/kylin/rest/service/JobService.java | 4 +-
.../kylin/source/kafka/SeekOffsetStep.java | 7 +-
12 files changed, 320 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index a5132af..2b299cc 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -65,13 +65,9 @@ public class Kafka10DataLoader extends StreamDataLoader {
props.put("retry.backoff.ms", "1000");
KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
- int boundary = messages.size() / 10;
for (int i = 0; i < messages.size(); ++i) {
ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
producer.send(keyedMessage);
- if (i % boundary == 0) {
- logger.info("sending " + i + " messages to " + this.toString());
- }
}
logger.info("sent " + messages.size() + " messages to " + this.toString());
producer.close();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fafb1fc..3b06ed8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable {
public String getCreateFlatHiveTableMethod() {
return getOptional("kylin.hive.create.flat.table.method", "1");
}
+
+ public int getMaxBuildingSegments() {
+ return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "1"));
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3a327f9..463c8e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -435,13 +435,8 @@ public class CubeManager implements IRealizationProvider {
}
public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
- return appendSegment(cube, startDate, endDate, startOffset, endOffset, true);
- }
-
- public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
- if (strictChecking)
- checkNoBuildingSegment(cube);
+ checkBuildingSegment(cube);
if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
// try figure out a reasonable start if missing
@@ -471,12 +466,9 @@ public class CubeManager implements IRealizationProvider {
updateCube(cubeBuilder);
return newSegment;
}
- public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
- return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true);
- }
- public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
- checkNoBuildingSegment(cube);
+ public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+ checkBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
@@ -497,7 +489,7 @@ public class CubeManager implements IRealizationProvider {
if (startDate >= endDate && startOffset >= endOffset)
throw new IllegalArgumentException("Invalid merge range");
- checkNoBuildingSegment(cube);
+ checkBuildingSegment(cube);
checkCubeIsPartitioned(cube);
boolean isOffsetsOn = cube.getSegments().get(0).isSourceOffsetsOn();
@@ -623,9 +615,10 @@ public class CubeManager implements IRealizationProvider {
}
}
- private void checkNoBuildingSegment(CubeInstance cube) {
- if (cube.getBuildingSegments().size() > 0) {
- throw new IllegalStateException("There is already a building segment!");
+ private void checkBuildingSegment(CubeInstance cube) {
+ int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
+ if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+ throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; ");
}
}
@@ -764,8 +757,9 @@ public class CubeManager implements IRealizationProvider {
}
for (CubeSegment seg : tobe) {
- if (isReady(seg) == false)
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not");
+ if (isReady(seg) == false) {
+ logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet.");
+ }
}
List<CubeSegment> toRemoveSegs = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 8808a56..5cae5ac 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -207,6 +207,7 @@ public class ExecutableDao {
}
public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
+ logger.debug("updating job output, id: " + output.getUuid());
try {
final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
output.setLastModified(ts);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 3a19486..d42b924 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -278,7 +278,7 @@ public class ExecutableManager {
ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
if (newStatus != null && oldStatus != newStatus) {
if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
- throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
+ throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
}
jobOutput.setStatus(newStatus.toString());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 180f0b8..a42ec05 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -119,7 +119,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
try {
- CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0, false);
+ CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0);
segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
segment.setInputRecords(streamingBatch.getMessages().size());
segment.setLastBuildTime(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index ef10c1e..8279417 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -106,7 +106,8 @@
}
} ],
"override_kylin_properties": {
- "kylin.cube.algorithm": "inmem"
+ "kylin.cube.algorithm": "inmem",
+ "kylin.cube.building.segment.max": "3"
},
"notify_list" : [ ],
"status_need_notify" : [ ],
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9e779ab..b7c609e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -137,15 +137,21 @@ public class BuildCubeWithStream {
int numberOfRecrods1 = 10000;
generateStreamData(date1, date2, numberOfRecrods1);
- buildSegment(cubeName, 0, Long.MAX_VALUE);
-
+ ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ Assert.assertTrue(result == ExecutableState.SUCCEED);
long date3 = f.parse("2013-04-01").getTime();
- int numberOfRecrods2 = 5000;
- generateStreamData(date2, date3, numberOfRecrods2);
- buildSegment(cubeName, 0, Long.MAX_VALUE);
+ int numberOfRecords2 = 5000;
+ generateStreamData(date2, date3, numberOfRecords2);
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ Assert.assertTrue(result == ExecutableState.SUCCEED);
+
+ //empty build
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ Assert.assertTrue(result == ExecutableState.DISCARDED);
//merge
- mergeSegment(cubeName, 0, 15000);
+ result = mergeSegment(cubeName, 0, 15000);
+ Assert.assertTrue(result == ExecutableState.SUCCEED);
List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
@@ -159,16 +165,16 @@ public class BuildCubeWithStream {
}
- private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
- return job.getId();
+ return job.getStatus();
}
private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
- CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
segment.setAdditionalInfo(partitionOffsetMap);
CubeInstance cubeInstance = cubeManager.getCube(cubeName);
CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
@@ -181,12 +187,12 @@ public class BuildCubeWithStream {
return job.getId();
}
- private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
- return job.getId();
+ return job.getStatus();
}
protected void deployEnv() throws IOException {
@@ -216,7 +222,7 @@ public class BuildCubeWithStream {
protected void waitForJob(String jobId) {
while (true) {
AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
break;
} else {
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
new file mode 100644
index 0000000..d48a473
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.provision;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.streaming.Kafka10DataLoader;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently.
+ */
+public class BuildCubeWithStream2 {
+
+ private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
+
+ private CubeManager cubeManager;
+ private DefaultScheduler scheduler;
+ protected ExecutableManager jobService;
+ private static final String cubeName = "test_streaming_table_cube";
+
+ private KafkaConfig kafkaConfig;
+ private MockKafka kafkaServer;
+ private static boolean generateData = true;
+
+ public void before() throws Exception {
+ deployEnv();
+
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ jobService = ExecutableManager.getInstance(kylinConfig);
+ scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+ cubeManager = CubeManager.getInstance(kylinConfig);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final String factTable = cubeInstance.getFactTable();
+
+ final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+ final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
+ kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
+
+ String topicName = UUID.randomUUID().toString();
+ String localIp = NetworkUtils.getLocalIp();
+ BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
+ brokerConfig.setHost(localIp);
+ kafkaConfig.setTopic(topicName);
+ KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
+
+ startEmbeddedKafka(topicName, brokerConfig);
+ }
+
+ private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
+ //Start mock Kakfa
+ String zkConnectionStr = "sandbox:2181";
+ ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+ // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
+ kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
+ kafkaServer.start();
+
+ kafkaServer.createTopic(topicName, 3, 1);
+ kafkaServer.waitTopicUntilReady(topicName);
+
+ MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
+ Assert.assertEquals(topicName, topicMetadata.topic());
+ }
+
+ private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
+ if (numberOfRecords <= 0)
+ return;
+ Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
+ logger.info("Test data inserted into Kafka");
+ }
+
+ private void clearSegment(String cubeName) throws Exception {
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ // remove all existing segments
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ cubeManager.updateCube(cubeBuilder);
+ }
+
+ public void build() throws Exception {
+ clearSegment(cubeName);
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ final long date1 = 0;
+ final long date2 = f.parse("2013-01-01").getTime();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ Random rand = new Random();
+ while (generateData == true) {
+ try {
+ generateStreamData(date1, date2, rand.nextInt(100));
+ sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time, from 0 to 100 seconds
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }).start();
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+ List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
+ for (int i = 0; i < 5; i++) {
+ FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
+ @Override
+ public ExecutableState call() {
+ ExecutableState result = null;
+ try {
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return result;
+ }
+ });
+
+ executorService.submit(futureTask);
+ futures.add(futureTask);
+ Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
+ }
+
+ generateData = false; // stop generating message to kafka
+ executorService.shutdown();
+ int succeedBuild = 0;
+ for (int i = 0; i < futures.size(); i++) {
+ ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
+ logger.info("Checking building task " + i + " whose state is " + result);
+ Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
+ if (result == ExecutableState.SUCCEED)
+ succeedBuild++;
+ }
+
+ logger.info(succeedBuild + " build jobs have been successfully completed.");
+ List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
+ Assert.assertTrue(segments.size() == succeedBuild);
+
+ }
+
+
+ private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getStatus();
+ }
+
+ protected void deployEnv() throws IOException {
+ DeployUtil.overrideJobJarLocations();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ }
+
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+ if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
+ throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ }
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+ }
+
+ public static void afterClass() throws Exception {
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ }
+
+ public void after() {
+ kafkaServer.stop();
+ }
+
+ protected void waitForJob(String jobId) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
+ break;
+ } else {
+ try {
+ sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ beforeClass();
+
+ BuildCubeWithStream2 buildCubeWithStream = new BuildCubeWithStream2();
+ buildCubeWithStream.before();
+ buildCubeWithStream.build();
+ logger.info("Build is done");
+ buildCubeWithStream.after();
+ afterClass();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 669f53e..42b117c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -272,7 +272,7 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
+ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
}
/** Build/Rebuild a cube segment by source offset */
@@ -286,16 +286,16 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
- return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
+ return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
}
private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
- long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
+ long startOffset, long endOffset, String buildType, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
- CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
+ CubeBuildTypeEnum.valueOf(buildType), force, submitter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 8929bf1..5c704ba 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
- CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
+ CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
DefaultChainedExecutable job;
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28525174/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 479f1b8..9369e6f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,10 +17,6 @@
*/
package org.apache.kylin.source.kafka;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Maps;
-import org.apache.commons.math3.util.MathUtils;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -38,7 +34,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -125,7 +120,7 @@ public class SeekOffsetStep extends AbstractExecutable {
} catch (IOException e) {
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset);
} else {
CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToRemoveSegs(segment);