You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/02 11:26:37 UTC
[5/5] kylin git commit: KYLIN-1726 update to kafka 0.10
KYLIN-1726 update to kafka 0.10
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/21167756
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/21167756
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/21167756
Branch: refs/heads/KYLIN-1726
Commit: 21167756ea14f95498b145b88acb096735b2eb80
Parents: a2db1f6
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 2 19:25:57 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Sep 2 19:25:57 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 30 ++++++++++++----
.../kylin/rest/controller/CubeController.java | 8 ++---
.../apache/kylin/rest/service/JobService.java | 4 +--
.../source/kafka/util/KafkaSampleProducer.java | 38 ++++++++++++--------
4 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 70ee176..5d2d701 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -680,12 +680,28 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- if (cube.getBuildingSegments().size() > 0) {
- logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
- return null;
+ List<CubeSegment> buildingSegs = cube.getBuildingSegments();
+ if (buildingSegs.size() > 0) {
+ logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
+ }
+
+ List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
+
+ List<CubeSegment> mergingSegs = Lists.newArrayList();
+ if (buildingSegs.size() > 0) {
+
+ for (CubeSegment building : buildingSegs) {
+ // exclude those under-merging segs
+ for (CubeSegment ready : readySegs) {
+ if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
+ mergingSegs.add(ready);
+ }
+ }
+ }
}
- List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY);
+ // exclude those already under merging segments
+ readySegs.removeAll(mergingSegs);
long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
Arrays.sort(timeRanges);
@@ -693,9 +709,9 @@ public class CubeManager implements IRealizationProvider {
for (int i = timeRanges.length - 1; i >= 0; i--) {
long toMergeRange = timeRanges[i];
- for (int s = 0; s < ready.size(); s++) {
- CubeSegment seg = ready.get(s);
- Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), //
+ for (int s = 0; s < readySegs.size(); s++) {
+ CubeSegment seg = readySegs.get(s);
+ Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7081d02..9c8b95f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -271,7 +271,7 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
}
/** Build/Rebuild a cube segment by source offset */
@@ -285,16 +285,16 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
- return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+ return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
}
private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
- long startOffset, long endOffset, String buildType, boolean force) {
+ long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
- CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+ CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index e4fbc98..ef132f0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -197,7 +197,7 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
- CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
+ CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
checkCubeDescSignature(cube);
checkNoRunningJob(cube);
@@ -205,7 +205,7 @@ public class JobService extends BasicService {
DefaultChainedExecutable job;
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 2a86a98..3d26d3d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -30,16 +30,15 @@ import java.util.Random;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kylin.common.util.OptionsHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
/**
* A sample producer which will create sample data to kafka topic
*/
@@ -49,7 +48,8 @@ public class KafkaSampleProducer {
@SuppressWarnings("static-access")
private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
- private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
+ private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
+ private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
private static final ObjectMapper mapper = new ObjectMapper();
@@ -61,6 +61,7 @@ public class KafkaSampleProducer {
options.addOption(OPTION_TOPIC);
options.addOption(OPTION_BROKER);
options.addOption(OPTION_DELAY);
+ options.addOption(OPTION_INTERVAL);
optionsHelper.parseOptions(options, args);
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
@@ -70,7 +71,13 @@ public class KafkaSampleProducer {
long delay = 0;
String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
if (delayString != null) {
- delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
+ delay = Long.parseLong(delayString);
+ }
+
+ long interval = 1000;
+ String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
+ if (intervalString != null) {
+ interval = Long.parseLong(intervalString);
}
List<String> countries = new ArrayList();
@@ -95,13 +102,16 @@ public class KafkaSampleProducer {
devices.add("Other");
Properties props = new Properties();
- props.put("metadata.broker.list", broker);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
-
- ProducerConfig config = new ProducerConfig(props);
+ props.put("bootstrap.servers", broker);
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new Producer<String, String>(config);
+ Producer<String, String> producer = new KafkaProducer<>(props);
boolean alive = true;
Random rnd = new Random();
@@ -114,10 +124,10 @@ public class KafkaSampleProducer {
record.put("qty", rnd.nextInt(10));
record.put("currency", "USD");
record.put("amount", rnd.nextDouble() * 100);
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
System.out.println("Sending 1 message");
producer.send(data);
- Thread.sleep(2000);
+ Thread.sleep(interval);
}
producer.close();
}