You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:26:01 UTC

[08/50] [abbrv] kylin git commit: Revert "KYLIN-1726 update to kafka 0.10"

Revert "KYLIN-1726 update to kafka 0.10"

This reverts commit 1b1b2e37fdcba7ad67f0fa3f2369aa65431f13bc.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/355e58ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/355e58ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/355e58ba

Branch: refs/heads/1.5.x-HBase1.x
Commit: 355e58ba4209ddf945663228688d550bf654c387
Parents: da5ba27
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:50:26 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 30 ++++------------
 .../kylin/rest/controller/CubeController.java   |  8 ++---
 .../apache/kylin/rest/service/JobService.java   |  4 +--
 .../source/kafka/util/KafkaSampleProducer.java  | 38 ++++++++------------
 4 files changed, 27 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 11eabce..fc68798 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -679,28 +679,12 @@ public class CubeManager implements IRealizationProvider {
             return null;
         }
 
-        List<CubeSegment> buildingSegs = cube.getBuildingSegments();
-        if (buildingSegs.size() > 0) {
-            logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
-        }
-
-        List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
-
-        List<CubeSegment> mergingSegs = Lists.newArrayList();
-        if (buildingSegs.size() > 0) {
-            
-            for (CubeSegment building : buildingSegs) {
-                // exclude those under-merging segs
-                for (CubeSegment ready : readySegs) {
-                    if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
-                        mergingSegs.add(ready);
-                    }
-                }
-            }
+        if (cube.getBuildingSegments().size() > 0) {
+            logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+            return null;
         }
 
-        // exclude those already under merging segments
-        readySegs.removeAll(mergingSegs);
+        List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY);
 
         long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
         Arrays.sort(timeRanges);
@@ -708,9 +692,9 @@ public class CubeManager implements IRealizationProvider {
         for (int i = timeRanges.length - 1; i >= 0; i--) {
             long toMergeRange = timeRanges[i];
 
-            for (int s = 0; s < readySegs.size(); s++) {
-                CubeSegment seg = readySegs.get(s);
-                Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
+            for (int s = 0; s < ready.size(); s++) {
+                CubeSegment seg = ready.get(s);
+                Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), //
                         seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
                 if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
                     return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());

http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 669f53e..42b117c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -272,7 +272,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
-        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
+        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
     }
 
     /** Build/Rebuild a cube segment by source offset */
@@ -286,16 +286,16 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
-        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
+        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
     }
 
     private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
-            long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
+            long startOffset, long endOffset, String buildType, boolean force) {
         try {
             String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
             return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
-                    CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
+                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 8929bf1..5c704ba 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
     public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
-            CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
+            CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
             throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
         DefaultChainedExecutable job;
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
+            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);

http://git-wip-us.apache.org/repos/asf/kylin/blob/355e58ba/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 3d26d3d..2a86a98 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -30,15 +30,16 @@ import java.util.Random;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
 /**
  * A sample producer which will create sample data to kafka topic
  */
@@ -48,8 +49,7 @@ public class KafkaSampleProducer {
     @SuppressWarnings("static-access")
     private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
     private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
-    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
-    private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
+    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
 
     private static final ObjectMapper mapper = new ObjectMapper();
 
@@ -61,7 +61,6 @@ public class KafkaSampleProducer {
         options.addOption(OPTION_TOPIC);
         options.addOption(OPTION_BROKER);
         options.addOption(OPTION_DELAY);
-        options.addOption(OPTION_INTERVAL);
         optionsHelper.parseOptions(options, args);
 
         logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
@@ -71,13 +70,7 @@ public class KafkaSampleProducer {
         long delay = 0;
         String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
         if (delayString != null) {
-            delay = Long.parseLong(delayString);
-        }
-
-        long interval = 1000;
-        String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
-        if (intervalString != null) {
-            interval = Long.parseLong(intervalString);
+            delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
         }
 
         List<String> countries = new ArrayList();
@@ -102,16 +95,13 @@ public class KafkaSampleProducer {
         devices.add("Other");
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", broker);
-        props.put("acks", "all");
-        props.put("retries", 0);
-        props.put("batch.size", 16384);
-        props.put("linger.ms", 1);
-        props.put("buffer.memory", 33554432);
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("metadata.broker.list", broker);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
 
-        Producer<String, String> producer = new KafkaProducer<>(props);
+        Producer<String, String> producer = new Producer<String, String>(config);
 
         boolean alive = true;
         Random rnd = new Random();
@@ -124,10 +114,10 @@ public class KafkaSampleProducer {
             record.put("qty", rnd.nextInt(10));
             record.put("currency", "USD");
             record.put("amount", rnd.nextDouble() * 100);
-            ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
             System.out.println("Sending 1 message");
             producer.send(data);
-            Thread.sleep(interval);
+            Thread.sleep(2000);
         }
         producer.close();
     }