You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:26:01 UTC
[08/50] [abbrv] kylin git commit: Revert "KYLIN-1726 update to kafka
0.10"
Revert "KYLIN-1726 update to kafka 0.10"
This reverts commit 1b1b2e37fdcba7ad67f0fa3f2369aa65431f13bc.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/355e58ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/355e58ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/355e58ba
Branch: refs/heads/1.5.x-HBase1.x
Commit: 355e58ba4209ddf945663228688d550bf654c387
Parents: da5ba27
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:50:26 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 30 ++++------------
.../kylin/rest/controller/CubeController.java | 8 ++---
.../apache/kylin/rest/service/JobService.java | 4 +--
.../source/kafka/util/KafkaSampleProducer.java | 38 ++++++++------------
4 files changed, 27 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 11eabce..fc68798 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -679,28 +679,12 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- List<CubeSegment> buildingSegs = cube.getBuildingSegments();
- if (buildingSegs.size() > 0) {
- logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
- }
-
- List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
-
- List<CubeSegment> mergingSegs = Lists.newArrayList();
- if (buildingSegs.size() > 0) {
-
- for (CubeSegment building : buildingSegs) {
- // exclude those under-merging segs
- for (CubeSegment ready : readySegs) {
- if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
- mergingSegs.add(ready);
- }
- }
- }
+ if (cube.getBuildingSegments().size() > 0) {
+ logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+ return null;
}
- // exclude those already under merging segments
- readySegs.removeAll(mergingSegs);
+ List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY);
long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
Arrays.sort(timeRanges);
@@ -708,9 +692,9 @@ public class CubeManager implements IRealizationProvider {
for (int i = timeRanges.length - 1; i >= 0; i--) {
long toMergeRange = timeRanges[i];
- for (int s = 0; s < readySegs.size(); s++) {
- CubeSegment seg = readySegs.get(s);
- Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
+ for (int s = 0; s < ready.size(); s++) {
+ CubeSegment seg = ready.get(s);
+ Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), //
seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 669f53e..42b117c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -272,7 +272,7 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
+ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
}
/** Build/Rebuild a cube segment by source offset */
@@ -286,16 +286,16 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
- return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
+ return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
}
private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
- long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
+ long startOffset, long endOffset, String buildType, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
- CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
+ CubeBuildTypeEnum.valueOf(buildType), force, submitter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 8929bf1..5c704ba 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
- CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
+ CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
DefaultChainedExecutable job;
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 3d26d3d..2a86a98 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -30,15 +30,16 @@ import java.util.Random;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kylin.common.util.OptionsHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
/**
* A sample producer which will create sample data to kafka topic
*/
@@ -48,8 +49,7 @@ public class KafkaSampleProducer {
@SuppressWarnings("static-access")
private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
- private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
- private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
+ private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
private static final ObjectMapper mapper = new ObjectMapper();
@@ -61,7 +61,6 @@ public class KafkaSampleProducer {
options.addOption(OPTION_TOPIC);
options.addOption(OPTION_BROKER);
options.addOption(OPTION_DELAY);
- options.addOption(OPTION_INTERVAL);
optionsHelper.parseOptions(options, args);
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
@@ -71,13 +70,7 @@ public class KafkaSampleProducer {
long delay = 0;
String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
if (delayString != null) {
- delay = Long.parseLong(delayString);
- }
-
- long interval = 1000;
- String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
- if (intervalString != null) {
- interval = Long.parseLong(intervalString);
+ delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
}
List<String> countries = new ArrayList();
@@ -102,16 +95,13 @@ public class KafkaSampleProducer {
devices.add("Other");
Properties props = new Properties();
- props.put("bootstrap.servers", broker);
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("metadata.broker.list", broker);
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
- Producer<String, String> producer = new KafkaProducer<>(props);
+ Producer<String, String> producer = new Producer<String, String>(config);
boolean alive = true;
Random rnd = new Random();
@@ -124,10 +114,10 @@ public class KafkaSampleProducer {
record.put("qty", rnd.nextInt(10));
record.put("currency", "USD");
record.put("amount", rnd.nextDouble() * 100);
- ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
System.out.println("Sending 1 message");
producer.send(data);
- Thread.sleep(interval);
+ Thread.sleep(2000);
}
producer.close();
}