You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/25 13:42:01 UTC

[05/13] kylin git commit: Revert "Revert "KYLIN-1726 update to kafka 0.10""

Revert "Revert "KYLIN-1726 update to kafka 0.10""

This reverts commit 355e58ba4209ddf945663228688d550bf654c387.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/01c0a585
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/01c0a585
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/01c0a585

Branch: refs/heads/KYLIN-1726-2
Commit: 01c0a585a120535a48ebeee5b761ddd93d37ec51
Parents: 1037fd4
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:56:47 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Sep 24 14:56:47 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 30 ++++++++++++----
 .../kylin/rest/controller/CubeController.java   |  8 ++---
 .../apache/kylin/rest/service/JobService.java   |  4 +--
 .../source/kafka/util/KafkaSampleProducer.java  | 38 ++++++++++++--------
 4 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/01c0a585/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index cc2baa5..5a4b07c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -714,12 +714,28 @@ public class CubeManager implements IRealizationProvider {
             return null;
         }
 
-        if (cube.getBuildingSegments().size() > 0) {
-            logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
-            return null;
+        List<CubeSegment> buildingSegs = cube.getBuildingSegments();
+        if (buildingSegs.size() > 0) {
+            logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
+        }
+
+        List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
+
+        List<CubeSegment> mergingSegs = Lists.newArrayList();
+        if (buildingSegs.size() > 0) {
+            
+            for (CubeSegment building : buildingSegs) {
+                // exclude those under-merging segs
+                for (CubeSegment ready : readySegs) {
+                    if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
+                        mergingSegs.add(ready);
+                    }
+                }
+            }
         }
 
-        List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY);
+        // exclude those already under merging segments
+        readySegs.removeAll(mergingSegs);
 
         long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
         Arrays.sort(timeRanges);
@@ -727,9 +743,9 @@ public class CubeManager implements IRealizationProvider {
         for (int i = timeRanges.length - 1; i >= 0; i--) {
             long toMergeRange = timeRanges[i];
 
-            for (int s = 0; s < ready.size(); s++) {
-                CubeSegment seg = ready.get(s);
-                Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), //
+            for (int s = 0; s < readySegs.size(); s++) {
+                CubeSegment seg = readySegs.get(s);
+                Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
                         seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
                 if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
                     return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());

http://git-wip-us.apache.org/repos/asf/kylin/blob/01c0a585/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 42b117c..669f53e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -272,7 +272,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
-        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
     }
 
     /** Build/Rebuild a cube segment by source offset */
@@ -286,16 +286,16 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
-        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
     }
 
     private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
-            long startOffset, long endOffset, String buildType, boolean force) {
+            long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
         try {
             String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
             return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
-                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+                    CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/01c0a585/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5c704ba..8929bf1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
     public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
-            CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
+            CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
             throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
         DefaultChainedExecutable job;
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
+            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);

http://git-wip-us.apache.org/repos/asf/kylin/blob/01c0a585/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 2a86a98..3d26d3d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -30,16 +30,15 @@ import java.util.Random;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
 /**
  * A sample producer which will create sample data to kafka topic
  */
@@ -49,7 +48,8 @@ public class KafkaSampleProducer {
     @SuppressWarnings("static-access")
     private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
     private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
-    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
+    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
+    private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
 
     private static final ObjectMapper mapper = new ObjectMapper();
 
@@ -61,6 +61,7 @@ public class KafkaSampleProducer {
         options.addOption(OPTION_TOPIC);
         options.addOption(OPTION_BROKER);
         options.addOption(OPTION_DELAY);
+        options.addOption(OPTION_INTERVAL);
         optionsHelper.parseOptions(options, args);
 
         logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
@@ -70,7 +71,13 @@ public class KafkaSampleProducer {
         long delay = 0;
         String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
         if (delayString != null) {
-            delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
+            delay = Long.parseLong(delayString);
+        }
+
+        long interval = 1000;
+        String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
+        if (intervalString != null) {
+            interval = Long.parseLong(intervalString);
         }
 
         List<String> countries = new ArrayList();
@@ -95,13 +102,16 @@ public class KafkaSampleProducer {
         devices.add("Other");
 
         Properties props = new Properties();
-        props.put("metadata.broker.list", broker);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-
-        ProducerConfig config = new ProducerConfig(props);
+        props.put("bootstrap.servers", broker);
+        props.put("acks", "all");
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 1);
+        props.put("buffer.memory", 33554432);
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-        Producer<String, String> producer = new Producer<String, String>(config);
+        Producer<String, String> producer = new KafkaProducer<>(props);
 
         boolean alive = true;
         Random rnd = new Random();
@@ -114,10 +124,10 @@ public class KafkaSampleProducer {
             record.put("qty", rnd.nextInt(10));
             record.put("currency", "USD");
             record.put("amount", rnd.nextDouble() * 100);
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
             System.out.println("Sending 1 message");
             producer.send(data);
-            Thread.sleep(2000);
+            Thread.sleep(interval);
         }
         producer.close();
     }