You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/02 11:26:33 UTC
[1/5] kylin git commit: KYLIN-1726 Scalable streaming cubing
Repository: kylin
Updated Branches:
refs/heads/KYLIN-1726 [created] 21167756e
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 8888d67..666297f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -1,37 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka;
import java.nio.ByteBuffer;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index d4308db..7db8285 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -1,37 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
index 4145ef6..d84d3db 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
@@ -1,37 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka;
import java.util.Collections;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
new file mode 100644
index 0000000..bb64bf9
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateTimeRangeStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
+
+ public UpdateTimeRangeStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+ final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+ String minValue = null, maxValue = null, currentValue = null;
+ try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ minValue = currentValue = bufferedReader.readLine();
+ while (currentValue != null) {
+ maxValue = currentValue;
+ currentValue = bufferedReader.readLine();
+ }
+ } catch (IOException e) {
+ logger.error("fail to read file " + outputFile, e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+
+ final DataType partitionColType = partitionCol.getType();
+ FastDateFormat dateFormat;
+ if (partitionColType.isDate()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ }
+ dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+ } else {
+ return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
+ }
+
+ try {
+ long startTime = dateFormat.parse(minValue).getTime();
+ long endTime = dateFormat.parse(maxValue).getTime();
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ segment.setDateRangeStart(startTime);
+ segment.setDateRangeEnd(endTime);
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (Exception e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 04a66f6..95349c2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -22,6 +22,7 @@ import java.util.List;
import javax.annotation.Nullable;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.persistence.Serializer;
@@ -67,7 +68,7 @@ public class KafkaClusterConfig extends RootPersistentEntity {
@Nullable
@Override
public Broker apply(BrokerConfig input) {
- return new Broker(input.getId(), input.getHost(), input.getPort());
+ return new Broker(input.getId(), input.getHost(), input.getPort(), SecurityProtocol.PLAINTEXT);
}
});
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
new file mode 100644
index 0000000..decfb60
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Run a Hadoop Job to process the stream data in kafka;
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaFlatTableJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class);
+
+ public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min";
+ public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max";
+ public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start.";
+ public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end.";
+
+ public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
+ public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
+ public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
+ public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
+ public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
+ public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
+ public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+ // ----------------------------------------------------------------------------
+ // add metadata to distributed cache
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
+ KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getFactTable());
+ String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+ String topic = kafkaConfig.getTopic();
+
+ if (brokers == null || brokers.length() == 0 || topic == null) {
+ throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
+ }
+
+ job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
+ job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
+ job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
+ job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
+ job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+ job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
+ setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ job.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(job, output);
+ FileOutputFormat.setCompressOutput(job, true);
+ org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
+ org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ logger.error("error in KafkaFlatTableJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+
+ }
+
+ private void setupMapper(CubeSegment cubeSeg) throws IOException {
+ // set the segment's offset info to job conf
+ Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg);
+ Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg);
+
+ Integer minPartition = Collections.min(offsetStart.keySet());
+ Integer maxPartition = Collections.max(offsetStart.keySet());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString());
+
+ for(Integer partition: offsetStart.keySet()) {
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString());
+ }
+
+ job.setMapperClass(KafkaFlatTableMapper.class);
+ job.setInputFormatClass(KafkaInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(0);
+ }
+
+ public static void main(String[] args) throws Exception {
+ KafkaFlatTableJob job = new KafkaFlatTableJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
new file mode 100644
index 0000000..995b2d4
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, Text, Text> {
+
+ private Text outKey = new Text();
+ private Text outValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ }
+
+ @Override
+ public void map(LongWritable key, BytesWritable value, Context context) throws IOException {
+ try {
+ outKey.set(Bytes.toBytes(key.get()));
+ outValue.set(value.getBytes(), 0, value.getLength());
+ context.write(outKey, outValue);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
new file mode 100644
index 0000000..81f6bac
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
+ String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
+ String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+ Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
+ Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+
+ Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+ Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+ for (int i = partitionMin; i <= partitionMax; i++) {
+ String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i);
+ String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i);
+ if (start != null && end != null) {
+ startOffsetMap.put(i, Long.valueOf(start));
+ endOffsetMap.put(i, Long.valueOf(end));
+ }
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
+ Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
+ for (int i = 0; i < partitionInfos.size(); i++) {
+ PartitionInfo partition = partitionInfos.get(i);
+ int partitionId = partition.partition();
+ if (startOffsetMap.containsKey(partitionId) == false) {
+ throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
+ }
+
+ if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
+ InputSplit split = new KafkaInputSplit(
+ brokers, inputTopic,
+ partitionId,
+ startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)
+ );
+ splits.add(split);
+ }
+ }
+ }
+ return splits;
+ }
+
+ @Override
+ public RecordReader<LongWritable, BytesWritable> createRecordReader(
+ InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+ InterruptedException {
+ return new KafkaInputRecordReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
new file mode 100644
index 0000000..f67fef5
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> {
+
+ static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
+
+ private Configuration conf;
+
+ private KafkaInputSplit split;
+ private Consumer consumer;
+ private String brokers;
+ private String topic;
+
+ private int partition;
+ private long earliestOffset;
+ private long watermark;
+ private long latestOffset;
+
+ private ConsumerRecords<String, String> messages;
+ private Iterator<ConsumerRecord<String, String>> iterator;
+ private LongWritable key;
+ private BytesWritable value;
+
+ private long timeOut = 60000;
+ private long bufferSize = 65536;
+
+ private long numProcessedMessages = 0L;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ initialize(split, context.getConfiguration());
+ }
+
+ public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException {
+ this.conf = conf;
+ this.split = (KafkaInputSplit) split;
+ brokers = this.split.getBrokers();
+ topic = this.split.getTopic();
+ partition = this.split.getPartition();
+ watermark = this.split.getOffsetStart();
+
+ if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
+ timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
+ }
+ if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
+ bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
+ }
+
+ String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+ consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
+
+ earliestOffset = this.split.getOffsetStart();
+ latestOffset = this.split.getOffsetEnd();
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ consumer.assign(Arrays.asList(topicPartition));
+ log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = new LongWritable();
+ }
+ if (value == null) {
+ value = new BytesWritable();
+ }
+
+ if (messages == null) {
+ log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ consumer.seek(topicPartition, watermark);
+ messages = consumer.poll(timeOut);
+ iterator = messages.iterator();
+ if (!iterator.hasNext()) {
+ log.info("No more messages, stop");
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
+ }
+ }
+
+ if (iterator.hasNext()) {
+ ConsumerRecord<String, String> message = iterator.next();
+ if (message.offset() >= latestOffset) {
+ log.info("Reach the end offset, stop reading.");
+ return false;
+ }
+ key.set(message.offset());
+ byte[] valuebytes = Bytes.toBytes(message.value());
+ value.set(valuebytes, 0, valuebytes.length);
+ watermark = message.offset() + 1;
+ numProcessedMessages++;
+ if (!iterator.hasNext()) {
+ messages = null;
+ iterator = null;
+ }
+ return true;
+ }
+
+ log.error("Unexpected iterator end.");
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (watermark >= latestOffset || earliestOffset == latestOffset) {
+ return 1.0f;
+ }
+ return Math.min(1.0f, (watermark - earliestOffset) / (float) (latestOffset - earliestOffset));
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages);
+ consumer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
new file mode 100644
index 0000000..3261399
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+ private String brokers;
+ private String topic;
+ private int partition;
+ private long offsetStart;
+ private long offsetEnd;
+
+ public KafkaInputSplit() {
+ }
+
+ public KafkaInputSplit(String brokers, String topic, int partition, long offsetStart, long offsetEnd) {
+ this.brokers = brokers;
+ this.topic = topic;
+ this.partition = partition;
+ this.offsetStart = offsetStart;
+ this.offsetEnd = offsetEnd;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ brokers = Text.readString(in);
+ topic = Text.readString(in);
+ partition = in.readInt();
+ offsetStart = in.readLong();
+ offsetEnd = in.readLong();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, brokers);
+ Text.writeString(out, topic);
+ out.writeInt(partition);
+ out.writeLong(offsetStart);
+ out.writeLong(offsetEnd);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{brokers};
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBrokers() {
+ return brokers;
+ }
+
+ public long getOffsetStart() {
+ return offsetStart;
+ }
+
+ public long getOffsetEnd() {
+ return offsetEnd;
+ }
+
+ @Override
+ public String toString() {
+ return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
new file mode 100644
index 0000000..640cc53
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ */
+public class KafkaClient {
+
+ public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
+ Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ return consumer;
+ }
+
+ public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
+ Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+ return producer;
+ }
+
+ private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokers);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("acks", "1");
+ props.put("buffer.memory", 33554432);
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 50);
+ props.put("timeout.ms", "30000");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return props;
+ }
+
+ private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokers);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("group.id", consumerGroup);
+ props.put("session.timeout.ms", "30000");
+ props.put("enable.auto.commit", "false");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return props;
+ }
+
+ public static String getKafkaBrokers(KafkaConfig kafkaConfig) {
+ String brokers = null;
+ for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+ for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
+ if (brokers == null) {
+ brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ } else {
+ brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }
+ }
+ return brokers;
+ }
+
+ public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) {
+
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ consumer.assign(Arrays.asList(topicPartition));
+ consumer.seekToBeginning(Arrays.asList(topicPartition));
+
+ return consumer.position(topicPartition);
+ }
+
+ public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) {
+
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ consumer.assign(Arrays.asList(topicPartition));
+ consumer.seekToEnd(Arrays.asList(topicPartition));
+
+ return consumer.position(topicPartition);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
new file mode 100644
index 0000000..b46e57f
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.cube.CubeSegment;
+
+import java.util.Map;
+
+/**
+ */
+public class KafkaOffsetMapping {
+
+ public static final String OFFSET_START = "kafka.offset.start.";
+ public static final String OFFSET_END = "kafka.offset.end.";
+
+ /**
+ * Get the start offsets for each partition from a segment
+ *
+ * @param segment
+ * @return
+ */
+ public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) {
+ return parseOffset(segment, OFFSET_START);
+ }
+
+ /**
+ * Get the end offsets for each partition from a segment
+ *
+ * @param segment
+ * @return
+ */
+ public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) {
+ return parseOffset(segment, OFFSET_END);
+ }
+
+ /**
+ * Save the partition start offset to cube segment
+ *
+ * @param segment
+ * @param offsetStart
+ */
+ public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) {
+ long sourceOffsetStart = 0;
+ for (Integer partition : offsetStart.keySet()) {
+ segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition)));
+ sourceOffsetStart += offsetStart.get(partition);
+ }
+
+ segment.setSourceOffsetStart(sourceOffsetStart);
+ }
+
+ /**
+ * Save the partition end offset to cube segment
+ *
+ * @param segment
+ * @param offsetEnd
+ */
+ public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) {
+ long sourceOffsetEnd = 0;
+ for (Integer partition : offsetEnd.keySet()) {
+ segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition)));
+ sourceOffsetEnd += offsetEnd.get(partition);
+ }
+
+ segment.setSourceOffsetEnd(sourceOffsetEnd);
+ }
+
+ private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) {
+ final Map<Integer, Long> offsetStartMap = Maps.newHashMap();
+ for (String key : segment.getAdditionalInfo().keySet()) {
+ if (key.startsWith(propertyPrefix)) {
+ Integer partition = Integer.valueOf(key.substring(propertyPrefix.length()));
+ Long offset = Long.valueOf(segment.getAdditionalInfo().get(key));
+ offsetStartMap.put(partition, offset);
+ }
+ }
+
+
+ return offsetStartMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
index 58cba7d..ddc2eb7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
@@ -1,37 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka.util;
import java.util.Collections;
@@ -42,6 +25,8 @@ import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
+import kafka.cluster.BrokerEndPoint;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.source.kafka.TopicMeta;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.slf4j.Logger;
@@ -86,13 +71,14 @@ public final class KafkaRequester {
if (consumerCache.containsKey(key)) {
return consumerCache.get(key);
} else {
- consumerCache.putIfAbsent(key, new SimpleConsumer(broker.host(), broker.port(), timeout, bufferSize, clientId));
+ BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
+ consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
return consumerCache.get(key);
}
}
private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
- return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
+ return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
}
public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 24eaa05..ee5bb20 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.StreamingParser;
@@ -55,7 +56,7 @@ public final class KafkaUtils {
if (partitionMetadata.errorCode() != 0) {
logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
}
- return partitionMetadata.leader();
+ return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 07a3cc3..5f435ed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -40,7 +40,7 @@ import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRange;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index c318cba..da087c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -31,7 +31,7 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a359d19..7240383 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.ShardingHash;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;
[4/5] kylin git commit: KYLIN-1818 change kafka dependency to provided
Posted by sh...@apache.org.
KYLIN-1818 change kafka dependency to provided
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2db1f64
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2db1f64
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2db1f64
Branch: refs/heads/KYLIN-1726
Commit: a2db1f64148c0e99ec71391173ee7900f697366b
Parents: c4e8655
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 2 18:58:11 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Sep 2 18:58:11 2016 +0800
----------------------------------------------------------------------
build/bin/find-kafka-dependency.sh | 12 ++++++------
build/bin/kylin.sh | 2 ++
.../kylin/engine/mr/common/AbstractHadoopJob.java | 4 ++--
3 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2db1f64/build/bin/find-kafka-dependency.sh
----------------------------------------------------------------------
diff --git a/build/bin/find-kafka-dependency.sh b/build/bin/find-kafka-dependency.sh
index c6b9c24..7349360 100644
--- a/build/bin/find-kafka-dependency.sh
+++ b/build/bin/find-kafka-dependency.sh
@@ -32,20 +32,20 @@ then
fi
# works for kafka 9+
-kafka_client=`find -L "$(dirname $kafka_home)" -name 'kafka-clients-[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
-if [ -z "$kafka_client" ]
+kafka_dependency=`find -L $kafka_home -name 'kafka-clients-[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
+if [ -z "$kafka_dependency" ]
then
# works for kafka 8
- kafka_broker=`find -L "$(dirname $kafka_home)" -name 'kafka_[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
- if [ -z "$kafka_broker" ]
+ kafka_dependency=`find -L $kafka_home -name 'kafka_[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
+ if [ -z "$kafka_dependency" ]
then
echo "kafka client lib not found"
exit 1
else
- echo "kafka dependency: $kafka_broker"
+ echo "kafka dependency: $kafka_dependency"
export kafka_dependency
fi
else
- echo "kafka dependency: $kafka_client"
+ echo "kafka dependency: $kafka_dependency"
export kafka_dependency
fi
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2db1f64/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 7bc0493..336e9b8 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -31,6 +31,7 @@ function retrieveDependency() {
#retrive $hive_dependency and $hbase_dependency
source ${dir}/find-hive-dependency.sh
source ${dir}/find-hbase-dependency.sh
+ source ${dir}/find-kafka-dependency.sh
#retrive $KYLIN_EXTRA_START_OPTS
if [ -f "${dir}/setenv.sh" ]
@@ -106,6 +107,7 @@ then
-Djava.io.tmpdir=${tomcat_root}/temp \
-Dkylin.hive.dependency=${hive_dependency} \
-Dkylin.hbase.dependency=${hbase_dependency} \
+ -Dkylin.kafka.dependency=${kafka_dependency} \
-Dkylin.rest.address=${kylin_rest_address} \
-Dspring.profiles.active=${spring_profile} \
org.apache.hadoop.util.RunJar ${tomcat_root}/bin/bootstrap.jar org.apache.catalina.startup.Bootstrap start >> ${KYLIN_HOME}/logs/kylin.out 2>&1 & echo $! > ${KYLIN_HOME}/pid &
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2db1f64/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index af2ed9f..a138eec 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -226,11 +226,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- // for hive dependencies
+ // for kafka dependencies
if (kylinKafkaDependency != null) {
kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
- logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency);
+ logger.info("Kafka Dependencies Before Filtered: " + kylinKafkaDependency);
if (kylinDependency.length() > 0)
kylinDependency.append(",");
[2/5] kylin git commit: KYLIN-1726 Scalable streaming cubing
Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9123d96f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9123d96f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9123d96f
Branch: refs/heads/KYLIN-1726
Commit: 9123d96f29a90b8c820cc6a3fdd99e5cb883400a
Parents: 1d04642
Author: shaofengshi <sh...@apache.org>
Authored: Tue Aug 30 14:44:47 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Aug 30 14:44:47 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 35 ++-
.../kylin/job/streaming/Kafka10DataLoader.java | 80 +++++++
build/bin/kylin.sh | 2 +-
.../apache/kylin/common/KylinConfigBase.java | 1 +
.../java/org/apache/kylin/cube/CubeSegment.java | 1 +
.../java/org/apache/kylin/cube/ISegment.java | 39 ----
.../cube/gridtable/SegmentGTStartAndEnd.java | 2 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 6 +
.../cube/model/CubeJoinedFlatTableEnrich.java | 6 +
.../apache/kylin/gridtable/ScannerWorker.java | 2 +-
.../metadata/model/IJoinedFlatTableDesc.java | 2 +
.../apache/kylin/metadata/model/ISegment.java | 36 +++
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 3 +
.../org/apache/kylin/engine/mr/IMRInput.java | 10 +
.../java/org/apache/kylin/engine/mr/MRUtil.java | 4 +
.../test_streaming_table_model_desc.json | 6 +-
.../kylin/provision/BuildCubeWithStream.java | 227 ++++++++++++++-----
.../org/apache/kylin/provision/MockKafka.java | 191 ++++++++++++++++
.../apache/kylin/provision/NetworkUtils.java | 52 +++++
pom.xml | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 11 +
source-kafka/pom.xml | 13 +-
.../kylin/source/kafka/KafkaConfigManager.java | 46 ++--
.../apache/kylin/source/kafka/KafkaMRInput.java | 221 ++++++++++++++++++
.../apache/kylin/source/kafka/KafkaSource.java | 57 +++++
.../kylin/source/kafka/KafkaStreamingInput.java | 65 +++---
.../kylin/source/kafka/MergeOffsetStep.java | 89 ++++++++
.../kylin/source/kafka/SeekOffsetStep.java | 119 ++++++++++
.../kylin/source/kafka/StreamingParser.java | 49 ++--
.../source/kafka/StringStreamingParser.java | 49 ++--
.../source/kafka/TimedJsonStreamParser.java | 49 ++--
.../apache/kylin/source/kafka/TopicMeta.java | 49 ++--
.../kylin/source/kafka/UpdateTimeRangeStep.java | 108 +++++++++
.../source/kafka/config/KafkaClusterConfig.java | 3 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 165 ++++++++++++++
.../kafka/hadoop/KafkaFlatTableMapper.java | 51 +++++
.../source/kafka/hadoop/KafkaInputFormat.java | 98 ++++++++
.../kafka/hadoop/KafkaInputRecordReader.java | 166 ++++++++++++++
.../source/kafka/hadoop/KafkaInputSplit.java | 102 +++++++++
.../kylin/source/kafka/util/KafkaClient.java | 115 ++++++++++
.../source/kafka/util/KafkaOffsetMapping.java | 97 ++++++++
.../kylin/source/kafka/util/KafkaRequester.java | 56 ++---
.../kylin/source/kafka/util/KafkaUtils.java | 3 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 2 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +-
46 files changed, 2141 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 8c64f91..9b282e3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -143,14 +143,12 @@ public class DeployUtil {
deployHiveTables();
}
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
- List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
+ List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable());
TableDesc tableDesc = cubeInstance.getFactTableDesc();
//load into kafka
streamDataLoader.loadIntoKafka(data);
- streamDataLoader.loadIntoKafka(data2);
logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
//csv data for H2 use
@@ -165,7 +163,7 @@ public class DeployUtil {
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
- overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
+ appendFactTableData(sb.toString(), cubeInstance.getFactTable());
}
public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
@@ -179,6 +177,33 @@ public class DeployUtil {
in.close();
}
+ public static void appendFactTableData(String factTableContent, String factTableName) throws IOException {
+ // Write to resource store
+ ResourceStore store = ResourceStore.getStore(config());
+
+ InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
+ String factTablePath = "/data/" + factTableName + ".csv";
+
+ File tmpFile = File.createTempFile(factTableName, "csv");
+ FileOutputStream out = new FileOutputStream(tmpFile);
+
+ try {
+ if (store.exists(factTablePath)) {
+ InputStream oldContent = store.getResource(factTablePath).inputStream;
+ IOUtils.copy(oldContent, out);
+ }
+ IOUtils.copy(in, out);
+ IOUtils.closeQuietly(in);
+
+ store.deleteResource(factTablePath);
+ in = new FileInputStream(tmpFile);
+ store.putResource(factTablePath, in, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeQuietly(out);
+ IOUtils.closeQuietly(in);
+ }
+
+ }
private static void deployHiveTables() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
new file mode 100644
index 0000000..a5132af
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+import org.apache.kylin.source.kafka.util.KafkaClient;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class Kafka10DataLoader extends StreamDataLoader {
+ private static final Logger logger = LoggerFactory.getLogger(Kafka10DataLoader.class);
+ List<KafkaClusterConfig> kafkaClusterConfigs;
+
+ public Kafka10DataLoader(KafkaConfig kafkaConfig) {
+ super(kafkaConfig);
+ this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
+ }
+
+ public void loadIntoKafka(List<String> messages) {
+
+ KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
+ String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+ @Nullable
+ @Override
+ public String apply(BrokerConfig brokerConfig) {
+ return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }), ",");
+
+ Properties props = new Properties();
+ props.put("acks", "1");
+ props.put("retry.backoff.ms", "1000");
+ KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
+
+ int boundary = messages.size() / 10;
+ for (int i = 0; i < messages.size(); ++i) {
+ ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
+ producer.send(keyedMessage);
+ if (i % boundary == 0) {
+ logger.info("sending " + i + " messages to " + this.toString());
+ }
+ }
+ logger.info("sent " + messages.size() + " messages to " + this.toString());
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 695b7f6..7bc0493 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -213,7 +213,7 @@ then
retrieveDependency
#retrive $KYLIN_EXTRA_START_OPTS from a separate file called setenv-tool.sh
- reset KYLIN_EXTRA_START_OPTS # reset the global server setenv config first
+ unset KYLIN_EXTRA_START_OPTS # reset the global server setenv config first
if [ -f "${dir}/setenv-tool.sh" ]
then source ${dir}/setenv-tool.sh
fi
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index de6b977..86402e9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -726,6 +726,7 @@ abstract public class KylinConfigBase implements Serializable {
Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine."));
// ref constants in ISourceAware
r.put(0, "org.apache.kylin.source.hive.HiveSource");
+ r.put(1, "org.apache.kylin.source.kafka.KafkaSource");
return r;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 79397c3..afb0d28 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -37,6 +37,7 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
deleted file mode 100644
index 2e1f214..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cube;
-
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-public interface ISegment {
-
- public String getName();
-
- public long getDateRangeStart();
-
- public long getDateRangeEnd();
-
- public long getSourceOffsetStart();
-
- public long getSourceOffsetEnd();
-
- public DataModelDesc getModel();
-
- public SegmentStatusEnum getStatus();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
index e31111d..889a0b2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.dimension.AbstractDateDimEnc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.metadata.datatype.DataType;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6aeb617..6ca89c8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -162,4 +163,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return cubeDesc.getDistributedByColumn();
}
+ @Override
+ public ISegment getSegment() {
+ return cubeSegment;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 5212859..8af2297 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -25,6 +25,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -137,4 +138,9 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
return flatDesc.getDistributedBy();
}
+ @Override
+ public ISegment getSegment() {
+ return flatDesc.getSegment();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
index bb7503a..4213cf3 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index f3a4107..ffa2680 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -37,4 +37,6 @@ public interface IJoinedFlatTableDesc {
long getSourceOffsetEnd();
TblColRef getDistributedBy();
+
+ ISegment getSegment();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
new file mode 100644
index 0000000..f69ae3f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model;
+
+public interface ISegment {
+
+ public String getName();
+
+ public long getDateRangeStart();
+
+ public long getDateRangeEnd();
+
+ public long getSourceOffsetStart();
+
+ public long getSourceOffsetEnd();
+
+ public DataModelDesc getModel();
+
+ public SegmentStatusEnum getStatus();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 289cd48..b504dbf 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -34,10 +34,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+ private final IMRInput.IMRBatchMergeInputSide inputSide;
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+ this.inputSide = MRUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
@@ -55,6 +57,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
}
// Phase 1: Merge Dictionary
+ inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 582052f..62cede9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
/**
@@ -34,6 +35,9 @@ public interface IMRInput {
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
+ /** Return a helper to participate in batch cubing merge job flow. */
+ public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
/**
* Utility that configures mapper to read from a table.
*/
@@ -67,4 +71,10 @@ public interface IMRInput {
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
+ public interface IMRBatchMergeInputSide {
+
+ /** Add step that executes before merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 2c3b77f..67eef5e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -71,6 +71,10 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
+ public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+ }
+
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index cfb889a..e6977e1 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -4,7 +4,7 @@
"name": "test_streaming_table_model_desc",
"dimensions": [
{
- "table": "default.streaming_table",
+ "table": "DEFAULT.STREAMING_TABLE",
"columns": [
"minute_start",
"hour_start",
@@ -20,10 +20,10 @@
"item_count"
],
"last_modified": 0,
- "fact_table": "default.streaming_table",
+ "fact_table": "DEFAULT.STREAMING_TABLE",
"filter_condition": null,
"partition_desc": {
- "partition_date_column": "default.streaming_table.minute_start",
+ "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start",
"partition_date_start": 0,
"partition_type": "APPEND"
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9490560..cfa9b45 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,59 +18,164 @@
package org.apache.kylin.provision;
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
+import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
-import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+import java.util.UUID;
+
/**
* for streaming cubing case "test_streaming_table"
*/
public class BuildCubeWithStream {
- private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
- private static final String cubeName = "test_streaming_table_cube";
- private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
- private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
- private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
+ private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
- private KylinConfig kylinConfig;
+ private CubeManager cubeManager;
+ private DefaultScheduler scheduler;
+ protected ExecutableManager jobService;
+ private static final String cubeName = "test_streaming_table_cube";
- public static void main(String[] args) throws Exception {
+ private KafkaConfig kafkaConfig;
+ private MockKafka kafkaServer;
- try {
- beforeClass();
+ public void before() throws Exception {
+ deployEnv();
- BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
- buildCubeWithStream.before();
- buildCubeWithStream.build();
- logger.info("Build is done");
- buildCubeWithStream.cleanup();
- logger.info("Going to exit");
- System.exit(0);
- } catch (Exception e) {
- logger.error("error", e);
- System.exit(1);
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ jobService = ExecutableManager.getInstance(kylinConfig);
+ scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
}
+ cubeManager = CubeManager.getInstance(kylinConfig);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final String factTable = cubeInstance.getFactTable();
+ final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+ final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
+ kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
+
+ String topicName = UUID.randomUUID().toString();
+ String localIp = NetworkUtils.getLocalIp();
+ BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
+ brokerConfig.setHost(localIp);
+ kafkaConfig.setTopic(topicName);
+ KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
+
+ startEmbeddedKafka(topicName, brokerConfig);
+ }
+
+ private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig){
+ //Start mock Kakfa
+ String zkConnectionStr = "sandbox:2181";
+ ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+ // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
+ kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
+ kafkaServer.start();
+
+ kafkaServer.createTopic(topicName, 3, 1);
+ kafkaServer.waitTopicUntilReady(topicName);
+
+ MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
+ Assert.assertEquals(topicName,topicMetadata.topic());
+ }
+
+
+ private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
+ Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
+ logger.info("Test data inserted into Kafka");
+ }
+
+ private void clearSegment(String cubeName) throws Exception {
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ // remove all existing segments
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ cubeManager.updateCube(cubeBuilder);
+ }
+
+ public void build() throws Exception {
+ clearSegment(cubeName);
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long date1 = 0;
+ long date2 = f.parse("2013-01-01").getTime();
+
+ int numberOfRecrods1 = 10000;
+ generateStreamData(date1, date2, numberOfRecrods1);
+ buildSegment(cubeName, 0, Long.MAX_VALUE);
+
+ long date3 = f.parse("2013-04-01").getTime();
+ int numberOfRecrods2 = 5000;
+ generateStreamData(date2, date3, numberOfRecrods2);
+ buildSegment(cubeName, 0, Long.MAX_VALUE);
+
+ //merge
+ mergeSegment(cubeName, 0, 15000);
+
+ }
+
+ private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
+ DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getId();
+ }
+
+ private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getId();
+ }
+
+ private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
+ jobService.addJob(job);
+ waitForJob(job.getId());
+ return job.getId();
+ }
+
+ protected void deployEnv() throws IOException {
+ DeployUtil.overrideJobJarLocations();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {
@@ -83,44 +188,54 @@ public class BuildCubeWithStream {
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
- protected void deployEnv() throws IOException {
- DeployUtil.overrideJobJarLocations();
+ public static void afterClass() throws Exception {
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
}
- public void before() throws Exception {
- deployEnv();
+ public void after(){
+ kafkaServer.stop();
+ }
- kylinConfig = KylinConfig.getInstanceFromEnv();
- final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- final String factTable = cubeInstance.getFactTable();
- final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable);
+ protected void waitForJob(String jobId) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+ break;
+ } else {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
- //Use a random topic for kafka data stream
- KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName());
- streamingConfig.setTopic(UUID.randomUUID().toString());
- KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
+ public static void main(String[] args) throws Exception {
+ try {
+ beforeClass();
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
- }
+ BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+ buildCubeWithStream.before();
+ buildCubeWithStream.build();
+ logger.info("Build is done");
+ buildCubeWithStream.after();
+ afterClass();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
- public void cleanup() throws Exception {
- cleanupOldStorage();
- HBaseMetadataTestCase.staticCleanupTestMetadata();
}
protected int cleanupOldStorage() throws Exception {
String[] args = { "--delete", "true" };
- StorageCleanupJob cli = new StorageCleanupJob();
- cli.execute(args);
+// KapStorageCleanupCLI cli = new KapStorageCleanupCLI();
+// cli.execute(args);
return 0;
}
- public void build() throws Exception {
- logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
- for (long start = startTime; start < endTime; start += batchInterval) {
- logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
- new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
new file mode 100644
index 0000000..3f47923
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.provision;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.kafka.common.requests.MetadataResponse;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZkUtils;
+
+public class MockKafka {
+ private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
+ Properties properties = new Properties();
+ properties.put("port", port);
+ properties.put("broker.id", brokerId);
+ properties.put("log.dirs", logDir);
+ properties.put("host.name", "localhost");
+ properties.put("offsets.topic.replication.factor", "1");
+ properties.put("delete.topic.enable", "true");
+ properties.put("zookeeper.connect", zkServerConnection.getServers());
+ String ip = NetworkUtils.getLocalIp();
+ properties.put("listeners", "PLAINTEXT://" + ip + ":" + port);
+ properties.put("advertised.listeners", "PLAINTEXT://" + ip + ":" + port);
+ return properties;
+ }
+
+ private KafkaServerStartable kafkaServer;
+
+ private ZkConnection zkConnection;
+
+ public MockKafka(ZkConnection zkServerConnection) {
+ this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1");
+ start();
+ }
+
+ private MockKafka(Properties properties) {
+ KafkaConfig kafkaConfig = new KafkaConfig(properties);
+ kafkaServer = new KafkaServerStartable(kafkaConfig);
+ }
+
+ public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) {
+ this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId));
+ start();
+ }
+
+ private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
+ this(createProperties(zkServerConnection, logDir, port, brokerId));
+ this.zkConnection = zkServerConnection;
+ System.out.println(String.format("Kafka %s:%s dir:%s", kafkaServer.serverConfig().brokerId(), kafkaServer.serverConfig().port(), kafkaServer.serverConfig().logDirs()));
+ }
+
+ public void createTopic(String topic, int partition, int replication) {
+ ZkClient zkClient = new ZkClient(zkConnection);
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ zkClient.setZkSerializer(new ZKStringSerializer());
+ AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null);
+ zkClient.close();
+ }
+
+ public void createTopic(String topic) {
+ this.createTopic(topic, 1, 1);
+ }
+
+ public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
+ ZkClient zkClient = new ZkClient(zkConnection);
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ zkClient.setZkSerializer(new ZKStringSerializer());
+ MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
+ zkClient.close();
+ return topicMetadata;
+ }
+
+ /**
+ * Delete may not work
+ *
+ * @param topic
+ */
+ public void deleteTopic(String topic) {
+ ZkClient zkClient = new ZkClient(zkConnection);
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ zkClient.setZkSerializer(new ZKStringSerializer());
+ AdminUtils.deleteTopic(zkUtils, topic);
+ zkClient.close();
+ }
+
+ public String getConnectionString() {
+ return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port());
+ }
+
+ public void start() {
+ kafkaServer.startup();
+ System.out.println("embedded kafka is up");
+ }
+
+ public void stop() {
+ kafkaServer.shutdown();
+ System.out.println("embedded kafka down");
+ }
+
+ public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
+ boolean isReady = false;
+ MetadataResponse.TopicMetadata topicMeta = null;
+ while (!isReady) {
+ Random random = new Random();
+ topicMeta = this.fetchTopicMeta(topic);
+ List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata();
+ Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator();
+ boolean hasGotLeader = true;
+ boolean hasGotReplica = true;
+ while (iterator.hasNext()) {
+ MetadataResponse.PartitionMetadata partitionMeta = iterator.next();
+ hasGotLeader &= (!partitionMeta.leader().isEmpty());
+ if (partitionMeta.leader().isEmpty()) {
+ System.out.println("Partition leader is not ready, wait 1s.");
+ break;
+ }
+ hasGotReplica &= (!partitionMeta.replicas().isEmpty());
+ if (partitionMeta.replicas().isEmpty()) {
+ System.out.println("Partition replica is not ready, wait 1s.");
+ break;
+ }
+ }
+ isReady = hasGotLeader & hasGotReplica;
+ if (!isReady) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ return topicMeta;
+ }
+
+ public String getZookeeperConnection() {
+ return this.zkConnection.getServers();
+ }
+}
+
+class ZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ byte[] bytes = null;
+ try {
+ bytes = data.toString().getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new ZkMarshallingError(e);
+ }
+ return bytes;
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null)
+ return null;
+ else
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new ZkMarshallingError(e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
new file mode 100644
index 0000000..98f6d04
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.provision;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+public class NetworkUtils {
+
+ public static String getLocalIp() {
+ try {
+ Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface iface = interfaces.nextElement();
+ if (iface.isLoopback() || !iface.isUp() || iface.isVirtual() || iface.isPointToPoint())
+ continue;
+ if (iface.getName().startsWith("vboxnet"))
+ continue;
+
+ Enumeration<InetAddress> addresses = iface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ InetAddress addr = addresses.nextElement();
+ final String ip = addr.getHostAddress();
+ if (Inet4Address.class == addr.getClass())
+ return ip;
+ }
+ }
+ } catch (SocketException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea29958..8b7e05b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<!-- HBase versions -->
<hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
- <kafka.version>0.8.1</kafka.version>
+ <kafka.version>0.10.0.0</kafka.version>
<!-- Hadoop deps, keep compatible with hadoop2.version -->
<zookeeper.version>3.4.6</zookeeper.version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index e3d7879..444779a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -43,6 +43,7 @@ import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.LookupDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
@@ -62,6 +63,16 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(table.getIdentity());
}
+ @Override
+ public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new IMRBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 9393216..7f2a2e9 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -32,10 +32,11 @@
</parent>
- <properties>
- </properties>
-
<dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-mr</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -60,16 +61,10 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
-
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d594873..cfdf316 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -1,35 +1,19 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kylin.source.kafka;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
new file mode 100644
index 0000000..cfce137
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+public class KafkaMRInput implements IMRInput {
+
+ CubeSegment cubeSegment;
+
+ @Override
+ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ this.cubeSegment = (CubeSegment)flatDesc.getSegment();
+ return new BatchCubingInputSide(cubeSegment);
+ }
+
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+ KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
+ KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
+ List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(ColumnDesc input) {
+ return input.getRef();
+ }
+ });
+
+ return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
+ }
+
+ @Override
+ public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
+ }
+
+ public static class KafkaTableInputFormat implements IMRTableInputFormat {
+ private final CubeSegment cubeSegment;
+ private List<TblColRef> columns;
+ private StreamingParser streamingParser;
+ private KafkaConfig kafkaConfig;
+ private final JobEngineConfig conf;
+
+ public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
+ this.cubeSegment = cubeSegment;
+ this.columns = columns;
+ this.kafkaConfig = kafkaConfig;
+ this.conf = conf;
+ }
+
+ @Override
+ public void configureJob(Job job) {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
+ IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+ String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ try {
+ FileInputFormat.addInputPath(job, new Path(inputPath));
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String[] parseMapperInput(Object mapperInput) {
+ if (streamingParser == null) {
+ try {
+ streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalArgumentException();
+ }
+ }
+ Text text = (Text) mapperInput;
+ ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice();
+ StreamingMessage streamingMessage = streamingParser.parse(buffer);
+ return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]);
+ }
+
+ }
+
+ public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+
+ final JobEngineConfig conf;
+ final CubeSegment seg;
+ private String outputPath;
+
+ public BatchCubingInputSide(CubeSegment seg) {
+ this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.seg = seg;
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId()));
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+ }
+
+ public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) {
+ final SeekOffsetStep result = new SeekOffsetStep();
+ result.setName("Seek and update offset step");
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+ return result;
+ }
+
+ private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
+ MapReduceExecutable result = new MapReduceExecutable();
+
+ IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
+ outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ result.setName("Save data from Kafka");
+ result.setMapReduceJobClass(KafkaFlatTableJob.class);
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
+ StringBuilder cmd = new StringBuilder();
+ jobBuilderSupport.appendMapReduceParameters(cmd);
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ final UpdateTimeRangeStep result = new UpdateTimeRangeStep();
+ result.setName("Update Segment Time Range");
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM");
+ result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId()));
+ jobFlow.addTask(result);
+
+ }
+
+ @Override
+ public IMRTableInputFormat getFlatTableInputFormat() {
+ KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
+ KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getRealization().getFactTable());
+ List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
+
+ return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+
+ }
+
+ }
+
+ class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
+
+ private CubeSegment cubeSegment;
+
+ KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
+ this.cubeSegment = cubeSegment;
+ }
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+
+ final MergeOffsetStep result = new MergeOffsetStep();
+ result.setName("Merge offset step");
+
+ CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
+ jobFlow.addTask(result);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
new file mode 100644
index 0000000..d039583
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.List;
+
+//used by reflection
+public class KafkaSource implements ISource {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == IMRInput.class) {
+ return (I) new KafkaMRInput();
+ } else {
+ throw new RuntimeException("Cannot adapt to " + engineInterface);
+ }
+ }
+
+ @Override
+ public ReadableTable createReadableTable(TableDesc tableDesc) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getMRDependentResources(TableDesc table) {
+ List<String> dependentResources = Lists.newArrayList();
+ dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
+ dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+ return dependentResources;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index c3bdb75..de42689 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -1,36 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka;
import java.util.List;
@@ -40,6 +24,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Function;
+import kafka.cluster.BrokerEndPoint;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
@@ -66,6 +53,8 @@ import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
+import javax.annotation.Nullable;
+
@SuppressWarnings("unused")
public class KafkaStreamingInput implements IStreamingInput {
@@ -151,8 +140,16 @@ public class KafkaStreamingInput implements IStreamingInput {
if (partitionMetadata.errorCode() != 0) {
logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
}
- replicaBrokers = partitionMetadata.replicas();
- return partitionMetadata.leader();
+ replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
+ @Nullable
+ @Override
+ public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
+ return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
+ }
+ });
+ BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
+
+ return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
new file mode 100644
index 0000000..a21b980
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
+
+/**
+ */
+public class MergeOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
+ public MergeOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+ Map<Integer, Long> mergedStartOffsets = Maps.newHashMap();
+ Map<Integer, Long> mergedEndOffsets = Maps.newHashMap();
+
+ long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0;
+ for (CubeSegment seg: mergingSegs) {
+ Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg);
+ Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg);
+
+ for (Integer partition : startOffsets.keySet()) {
+ long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE;
+ long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0;
+ mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition)));
+ mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition)));
+ }
+ dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart());
+ dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd());
+ }
+
+ KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets);
+ KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets);
+ segment.setDateRangeStart(dateRangeStart);
+ segment.setDateRangeEnd(dateRangeEnd);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
new file mode 100644
index 0000000..5dca93f
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SeekOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
+
+ public SeekOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment);
+ Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment);
+
+ if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
+ }
+
+ final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
+ final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+ final String topic = kafakaConfig.getTopic();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+
+ if (startOffsets.isEmpty()) {
+ // user didn't specify start offset, use the biggest offset in existing segments as start
+ for (CubeSegment seg : cube.getSegments()) {
+ Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg);
+ for (PartitionInfo partition : partitionInfos) {
+ int partitionId = partition.partition();
+ if (segEndOffset.containsKey(partitionId)) {
+ startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
+ }
+ }
+ }
+
+ if (partitionInfos.size() > startOffsets.size()) {
+ // has new partition added
+ for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+ startOffsets.put(partitionInfos.get(x).partition(), earliest);
+ }
+ }
+
+ logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+ }
+
+ if (endOffsets.isEmpty()) {
+ // user didn't specify end offset, use latest offset in kafka
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+ endOffsets.put(partitionInfo.partition(), latest);
+ }
+
+ logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
+ }
+ }
+
+ KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
+ KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+
+ segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9123d96f/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index cb6a72b..6b7d658 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -1,37 +1,20 @@
/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
[3/5] kylin git commit: KYLIN-1726 use segment uuid instead of name
Posted by sh...@apache.org.
KYLIN-1726 use segment uuid instead of name
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c4e8655b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c4e8655b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c4e8655b
Branch: refs/heads/KYLIN-1726
Commit: c4e8655b8fd9e11fc2b5d9b78dc77a561817725a
Parents: 9123d96
Author: shaofengshi <sh...@apache.org>
Authored: Tue Aug 30 20:41:42 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Aug 30 20:41:42 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 5 +++-
.../kylin/provision/BuildCubeWithStream.java | 26 +++++++++++++++++---
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 11 +++------
.../kafka/hadoop/KafkaInputRecordReader.java | 9 ++++---
5 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4e8655b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2ebf5d3..70ee176 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -445,8 +445,11 @@ public class CubeManager implements IRealizationProvider {
updateCube(cubeBuilder);
return newSegment;
}
-
public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+ return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true);
+ }
+
+ public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException {
checkNoBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4e8655b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index cfa9b45..2c09f48 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
@@ -146,18 +148,34 @@ public class BuildCubeWithStream {
//merge
mergeSegment(cubeName, 0, 15000);
+ List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
+
+ CubeSegment toRefreshSeg = segments.get(0);
+ HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
+
+ refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+ segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
+
}
private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
+ CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
return job.getId();
}
- private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
+ CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
+ segment.setAdditionalInfo(partitionOffsetMap);
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
@@ -165,7 +183,7 @@ public class BuildCubeWithStream {
}
private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4e8655b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index cfce137..a5f678f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput {
jobBuilderSupport.appendMapReduceParameters(cmd);
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
result.setMapReduceParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4e8655b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index decfb60..87d2471 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -33,7 +33,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
@@ -70,14 +69,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String segmentId = getOptionValue(OPTION_SEGMENT_ID);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
@@ -85,7 +84,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
@@ -104,11 +103,9 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
- setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4e8655b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index f67fef5..6774c9d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -105,6 +105,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
value = new BytesWritable();
}
+ if (watermark >= latestOffset) {
+ log.info("Reach the end offset, stop reading.");
+ return false;
+ }
+
if (messages == null) {
log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -119,10 +124,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
if (iterator.hasNext()) {
ConsumerRecord<String, String> message = iterator.next();
- if (message.offset() >= latestOffset) {
- log.info("Reach the end offset, stop reading.");
- return false;
- }
key.set(message.offset());
byte[] valuebytes = Bytes.toBytes(message.value());
value.set(valuebytes, 0, valuebytes.length);
[5/5] kylin git commit: KYLIN-1726 update to kafka 0.10
Posted by sh...@apache.org.
KYLIN-1726 update to kafka 0.10
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/21167756
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/21167756
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/21167756
Branch: refs/heads/KYLIN-1726
Commit: 21167756ea14f95498b145b88acb096735b2eb80
Parents: a2db1f6
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 2 19:25:57 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Sep 2 19:25:57 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 30 ++++++++++++----
.../kylin/rest/controller/CubeController.java | 8 ++---
.../apache/kylin/rest/service/JobService.java | 4 +--
.../source/kafka/util/KafkaSampleProducer.java | 38 ++++++++++++--------
4 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 70ee176..5d2d701 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -680,12 +680,28 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- if (cube.getBuildingSegments().size() > 0) {
- logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
- return null;
+ List<CubeSegment> buildingSegs = cube.getBuildingSegments();
+ if (buildingSegs.size() > 0) {
+ logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
+ }
+
+ List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
+
+ List<CubeSegment> mergingSegs = Lists.newArrayList();
+ if (buildingSegs.size() > 0) {
+
+ for (CubeSegment building : buildingSegs) {
+ // exclude those under-merging segs
+ for (CubeSegment ready : readySegs) {
+ if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
+ mergingSegs.add(ready);
+ }
+ }
+ }
}
- List<CubeSegment> ready = cube.getSegments(SegmentStatusEnum.READY);
+ // exclude those already under merging segments
+ readySegs.removeAll(mergingSegs);
long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
Arrays.sort(timeRanges);
@@ -693,9 +709,9 @@ public class CubeManager implements IRealizationProvider {
for (int i = timeRanges.length - 1; i >= 0; i--) {
long toMergeRange = timeRanges[i];
- for (int s = 0; s < ready.size(); s++) {
- CubeSegment seg = ready.get(s);
- Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(ready.subList(s, ready.size()), //
+ for (int s = 0; s < readySegs.size(); s++) {
+ CubeSegment seg = readySegs.get(s);
+ Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7081d02..9c8b95f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -271,7 +271,7 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment());
}
/** Build/Rebuild a cube segment by source offset */
@@ -285,16 +285,16 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
- return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+ return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
}
private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
- long startOffset, long endOffset, String buildType, boolean force) {
+ long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
- CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+ CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index e4fbc98..ef132f0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -197,7 +197,7 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
- CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
+ CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException {
checkCubeDescSignature(cube);
checkNoRunningJob(cube);
@@ -205,7 +205,7 @@ public class JobService extends BasicService {
DefaultChainedExecutable job;
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
http://git-wip-us.apache.org/repos/asf/kylin/blob/21167756/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 2a86a98..3d26d3d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -30,16 +30,15 @@ import java.util.Random;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kylin.common.util.OptionsHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
/**
* A sample producer which will create sample data to kafka topic
*/
@@ -49,7 +48,8 @@ public class KafkaSampleProducer {
@SuppressWarnings("static-access")
private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
- private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
+ private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
+ private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
private static final ObjectMapper mapper = new ObjectMapper();
@@ -61,6 +61,7 @@ public class KafkaSampleProducer {
options.addOption(OPTION_TOPIC);
options.addOption(OPTION_BROKER);
options.addOption(OPTION_DELAY);
+ options.addOption(OPTION_INTERVAL);
optionsHelper.parseOptions(options, args);
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
@@ -70,7 +71,13 @@ public class KafkaSampleProducer {
long delay = 0;
String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
if (delayString != null) {
- delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
+ delay = Long.parseLong(delayString);
+ }
+
+ long interval = 1000;
+ String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
+ if (intervalString != null) {
+ interval = Long.parseLong(intervalString);
}
List<String> countries = new ArrayList();
@@ -95,13 +102,16 @@ public class KafkaSampleProducer {
devices.add("Other");
Properties props = new Properties();
- props.put("metadata.broker.list", broker);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
-
- ProducerConfig config = new ProducerConfig(props);
+ props.put("bootstrap.servers", broker);
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new Producer<String, String>(config);
+ Producer<String, String> producer = new KafkaProducer<>(props);
boolean alive = true;
Random rnd = new Random();
@@ -114,10 +124,10 @@ public class KafkaSampleProducer {
record.put("qty", rnd.nextInt(10));
record.put("currency", "USD");
record.put("amount", rnd.nextDouble() * 100);
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
System.out.println("Sending 1 message");
producer.send(data);
- Thread.sleep(2000);
+ Thread.sleep(interval);
}
producer.close();
}