You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 02:19:55 UTC
[23/28] kylin git commit: Revert "Revert "KYLIN-1726 Scalable
streaming cubing""
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
new file mode 100644
index 0000000..decfb60
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Run a Hadoop Job to process the stream data in kafka;
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaFlatTableJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class);
+
+ public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min";
+ public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max";
+ public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start.";
+ public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end.";
+
+ public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
+ public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
+ public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
+ public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
+ public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
+ public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
+ public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+ // ----------------------------------------------------------------------------
+ // add metadata to distributed cache
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
+ KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getFactTable());
+ String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+ String topic = kafkaConfig.getTopic();
+
+ if (brokers == null || brokers.length() == 0 || topic == null) {
+ throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
+ }
+
+ job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
+ job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
+ job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
+ job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
+ job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+ job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
+ setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ job.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(job, output);
+ FileOutputFormat.setCompressOutput(job, true);
+ org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
+ org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ logger.error("error in KafkaFlatTableJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+
+ }
+
+ private void setupMapper(CubeSegment cubeSeg) throws IOException {
+ // set the segment's offset info to job conf
+ Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg);
+ Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg);
+
+ Integer minPartition = Collections.min(offsetStart.keySet());
+ Integer maxPartition = Collections.max(offsetStart.keySet());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString());
+
+ for(Integer partition: offsetStart.keySet()) {
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString());
+ job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString());
+ }
+
+ job.setMapperClass(KafkaFlatTableMapper.class);
+ job.setInputFormatClass(KafkaInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(0);
+ }
+
+ public static void main(String[] args) throws Exception {
+ KafkaFlatTableJob job = new KafkaFlatTableJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
new file mode 100644
index 0000000..995b2d4
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, Text, Text> {
+
+ private Text outKey = new Text();
+ private Text outValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ }
+
+ @Override
+ public void map(LongWritable key, BytesWritable value, Context context) throws IOException {
+ try {
+ outKey.set(Bytes.toBytes(key.get()));
+ outValue.set(value.getBytes(), 0, value.getLength());
+ context.write(outKey, outValue);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
new file mode 100644
index 0000000..81f6bac
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
+ String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
+ String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+ Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
+ Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+
+ Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+ Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+ for (int i = partitionMin; i <= partitionMax; i++) {
+ String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i);
+ String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i);
+ if (start != null && end != null) {
+ startOffsetMap.put(i, Long.valueOf(start));
+ endOffsetMap.put(i, Long.valueOf(end));
+ }
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
+ Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
+ for (int i = 0; i < partitionInfos.size(); i++) {
+ PartitionInfo partition = partitionInfos.get(i);
+ int partitionId = partition.partition();
+ if (startOffsetMap.containsKey(partitionId) == false) {
+ throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
+ }
+
+ if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
+ InputSplit split = new KafkaInputSplit(
+ brokers, inputTopic,
+ partitionId,
+ startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)
+ );
+ splits.add(split);
+ }
+ }
+ }
+ return splits;
+ }
+
+ @Override
+ public RecordReader<LongWritable, BytesWritable> createRecordReader(
+ InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+ InterruptedException {
+ return new KafkaInputRecordReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
new file mode 100644
index 0000000..f67fef5
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> {
+
+ static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
+
+ private Configuration conf;
+
+ private KafkaInputSplit split;
+ private Consumer consumer;
+ private String brokers;
+ private String topic;
+
+ private int partition;
+ private long earliestOffset;
+ private long watermark;
+ private long latestOffset;
+
+ private ConsumerRecords<String, String> messages;
+ private Iterator<ConsumerRecord<String, String>> iterator;
+ private LongWritable key;
+ private BytesWritable value;
+
+ private long timeOut = 60000;
+ private long bufferSize = 65536;
+
+ private long numProcessedMessages = 0L;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ initialize(split, context.getConfiguration());
+ }
+
+ public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException {
+ this.conf = conf;
+ this.split = (KafkaInputSplit) split;
+ brokers = this.split.getBrokers();
+ topic = this.split.getTopic();
+ partition = this.split.getPartition();
+ watermark = this.split.getOffsetStart();
+
+ if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
+ timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
+ }
+ if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
+ bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
+ }
+
+ String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+ consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
+
+ earliestOffset = this.split.getOffsetStart();
+ latestOffset = this.split.getOffsetEnd();
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ consumer.assign(Arrays.asList(topicPartition));
+ log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = new LongWritable();
+ }
+ if (value == null) {
+ value = new BytesWritable();
+ }
+
+ if (messages == null) {
+ log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ consumer.seek(topicPartition, watermark);
+ messages = consumer.poll(timeOut);
+ iterator = messages.iterator();
+ if (!iterator.hasNext()) {
+ log.info("No more messages, stop");
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
+ }
+ }
+
+ if (iterator.hasNext()) {
+ ConsumerRecord<String, String> message = iterator.next();
+ if (message.offset() >= latestOffset) {
+ log.info("Reach the end offset, stop reading.");
+ return false;
+ }
+ key.set(message.offset());
+ byte[] valuebytes = Bytes.toBytes(message.value());
+ value.set(valuebytes, 0, valuebytes.length);
+ watermark = message.offset() + 1;
+ numProcessedMessages++;
+ if (!iterator.hasNext()) {
+ messages = null;
+ iterator = null;
+ }
+ return true;
+ }
+
+ log.error("Unexpected iterator end.");
+ throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (watermark >= latestOffset || earliestOffset == latestOffset) {
+ return 1.0f;
+ }
+ return Math.min(1.0f, (watermark - earliestOffset) / (float) (latestOffset - earliestOffset));
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages);
+ consumer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
new file mode 100644
index 0000000..3261399
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Convert Kafka topic to Hadoop InputFormat
+ * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+ private String brokers;
+ private String topic;
+ private int partition;
+ private long offsetStart;
+ private long offsetEnd;
+
+ public KafkaInputSplit() {
+ }
+
+ public KafkaInputSplit(String brokers, String topic, int partition, long offsetStart, long offsetEnd) {
+ this.brokers = brokers;
+ this.topic = topic;
+ this.partition = partition;
+ this.offsetStart = offsetStart;
+ this.offsetEnd = offsetEnd;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ brokers = Text.readString(in);
+ topic = Text.readString(in);
+ partition = in.readInt();
+ offsetStart = in.readLong();
+ offsetEnd = in.readLong();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, brokers);
+ Text.writeString(out, topic);
+ out.writeInt(partition);
+ out.writeLong(offsetStart);
+ out.writeLong(offsetEnd);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{brokers};
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBrokers() {
+ return brokers;
+ }
+
+ public long getOffsetStart() {
+ return offsetStart;
+ }
+
+ public long getOffsetEnd() {
+ return offsetEnd;
+ }
+
+ @Override
+ public String toString() {
+ return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
new file mode 100644
index 0000000..640cc53
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ */
+public class KafkaClient {
+
+ public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
+ Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ return consumer;
+ }
+
+ public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
+ Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+ return producer;
+ }
+
+ private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokers);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("acks", "1");
+ props.put("buffer.memory", 33554432);
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 50);
+ props.put("timeout.ms", "30000");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return props;
+ }
+
+ private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokers);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("group.id", consumerGroup);
+ props.put("session.timeout.ms", "30000");
+ props.put("enable.auto.commit", "false");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return props;
+ }
+
+ public static String getKafkaBrokers(KafkaConfig kafkaConfig) {
+ String brokers = null;
+ for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+ for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
+ if (brokers == null) {
+ brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ } else {
+ brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }
+ }
+ return brokers;
+ }
+
+ public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) {
+
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ consumer.assign(Arrays.asList(topicPartition));
+ consumer.seekToBeginning(Arrays.asList(topicPartition));
+
+ return consumer.position(topicPartition);
+ }
+
+ public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) {
+
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ consumer.assign(Arrays.asList(topicPartition));
+ consumer.seekToEnd(Arrays.asList(topicPartition));
+
+ return consumer.position(topicPartition);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
new file mode 100644
index 0000000..b46e57f
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.cube.CubeSegment;
+
+import java.util.Map;
+
+/**
+ */
+public class KafkaOffsetMapping {
+
+ public static final String OFFSET_START = "kafka.offset.start.";
+ public static final String OFFSET_END = "kafka.offset.end.";
+
+ /**
+ * Get the start offsets for each partition from a segment
+ *
+ * @param segment
+ * @return
+ */
+ public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) {
+ return parseOffset(segment, OFFSET_START);
+ }
+
+ /**
+ * Get the end offsets for each partition from a segment
+ *
+ * @param segment
+ * @return
+ */
+ public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) {
+ return parseOffset(segment, OFFSET_END);
+ }
+
+ /**
+ * Save the partition start offset to cube segment
+ *
+ * @param segment
+ * @param offsetStart
+ */
+ public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) {
+ long sourceOffsetStart = 0;
+ for (Integer partition : offsetStart.keySet()) {
+ segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition)));
+ sourceOffsetStart += offsetStart.get(partition);
+ }
+
+ segment.setSourceOffsetStart(sourceOffsetStart);
+ }
+
+ /**
+ * Save the partition end offset to cube segment
+ *
+ * @param segment
+ * @param offsetEnd
+ */
+ public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) {
+ long sourceOffsetEnd = 0;
+ for (Integer partition : offsetEnd.keySet()) {
+ segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition)));
+ sourceOffsetEnd += offsetEnd.get(partition);
+ }
+
+ segment.setSourceOffsetEnd(sourceOffsetEnd);
+ }
+
+ private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) {
+ final Map<Integer, Long> offsetStartMap = Maps.newHashMap();
+ for (String key : segment.getAdditionalInfo().keySet()) {
+ if (key.startsWith(propertyPrefix)) {
+ Integer partition = Integer.valueOf(key.substring(propertyPrefix.length()));
+ Long offset = Long.valueOf(segment.getAdditionalInfo().get(key));
+ offsetStartMap.put(partition, offset);
+ }
+ }
+
+
+ return offsetStartMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
index 919db20..bce9bb9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
+import kafka.cluster.BrokerEndPoint;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.source.kafka.TopicMeta;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.slf4j.Logger;
@@ -70,13 +72,14 @@ public final class KafkaRequester {
if (consumerCache.containsKey(key)) {
return consumerCache.get(key);
} else {
- consumerCache.putIfAbsent(key, new SimpleConsumer(broker.host(), broker.port(), timeout, bufferSize, clientId));
+ BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
+ consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
return consumerCache.get(key);
}
}
private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
- return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
+ return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
}
public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 24eaa05..ee5bb20 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
+import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.StreamingParser;
@@ -55,7 +56,7 @@ public final class KafkaUtils {
if (partitionMetadata.errorCode() != 0) {
logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
}
- return partitionMetadata.leader();
+ return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index c7de287..f285153 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,7 +39,7 @@ import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index c318cba..da087c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -31,7 +31,7 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index f1e5dab..5692000 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.ShardingHash;
-import org.apache.kylin.cube.ISegment;
+import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;