You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/19 16:00:24 UTC

[12/13] kylin git commit: Revert "KYLIN-1726 Scalable streaming cubing"

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index ab8b161..20c57a9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -1,20 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
index d84d3db..4145ef6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
@@ -1,20 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
 package org.apache.kylin.source.kafka;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
deleted file mode 100644
index bb64bf9..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class UpdateTimeRangeStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
-
-    public UpdateTimeRangeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-        final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, partitionCol.getName());
-
-        String minValue = null, maxValue = null, currentValue = null;
-        try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
-            minValue = currentValue = bufferedReader.readLine();
-            while (currentValue != null) {
-                maxValue = currentValue;
-                currentValue = bufferedReader.readLine();
-            }
-        } catch (IOException e) {
-            logger.error("fail to read file " + outputFile, e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-
-        final DataType partitionColType = partitionCol.getType();
-        FastDateFormat dateFormat;
-        if (partitionColType.isDate()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
-        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat)) {
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            }
-            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
-        } else {
-            return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
-        }
-
-        try {
-            long startTime = dateFormat.parse(minValue).getTime();
-            long endTime = dateFormat.parse(maxValue).getTime();
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            segment.setDateRangeStart(startTime);
-            segment.setDateRangeEnd(endTime);
-            cubeBuilder.setToUpdateSegs(segment);
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (Exception e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 95349c2..04a66f6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import javax.annotation.Nullable;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
@@ -68,7 +67,7 @@ public class KafkaClusterConfig extends RootPersistentEntity {
             @Nullable
             @Override
             public Broker apply(BrokerConfig input) {
-                return new Broker(input.getId(), input.getHost(), input.getPort(), SecurityProtocol.PLAINTEXT);
+                return new Broker(input.getId(), input.getHost(), input.getPort());
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
deleted file mode 100644
index decfb60..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.hadoop;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Run a Hadoop Job to process the stream data in kafka;
- * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
- */
-public class KafkaFlatTableJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class);
-
-    public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min";
-    public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max";
-    public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start.";
-    public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end.";
-
-    public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
-    public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
-    public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
-    public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
-    public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
-    public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
-    public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_SEGMENT_NAME);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
-            // ----------------------------------------------------------------------------
-            // add metadata to distributed cache
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            logger.info("Starting: " + job.getJobName());
-
-            setJobClasspath(job, cube.getConfig());
-
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getFactTable());
-            String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
-            String topic = kafkaConfig.getTopic();
-
-            if (brokers == null || brokers.length() == 0 || topic == null) {
-                throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
-            }
-
-            job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
-            job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
-            job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
-            job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
-            job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
-            job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
-            setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
-            job.setNumReduceTasks(0);
-            FileOutputFormat.setOutputPath(job, output);
-            FileOutputFormat.setCompressOutput(job, true);
-            org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
-            org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
-            job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
-
-            deletePath(job.getConfiguration(), output);
-
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            logger.error("error in KafkaFlatTableJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-
-    }
-
-    private void setupMapper(CubeSegment cubeSeg) throws IOException {
-        // set the segment's offset info to job conf
-        Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg);
-        Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg);
-
-        Integer minPartition = Collections.min(offsetStart.keySet());
-        Integer maxPartition = Collections.max(offsetStart.keySet());
-        job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString());
-        job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString());
-
-        for(Integer partition: offsetStart.keySet()) {
-            job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString());
-            job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString());
-        }
-
-        job.setMapperClass(KafkaFlatTableMapper.class);
-        job.setInputFormatClass(KafkaInputFormat.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setNumReduceTasks(0);
-    }
-
-    public static void main(String[] args) throws Exception {
-        KafkaFlatTableJob job = new KafkaFlatTableJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
deleted file mode 100644
index 995b2d4..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, Text, Text> {
-
-    private Text outKey = new Text();
-    private Text outValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        Configuration conf = context.getConfiguration();
-        bindCurrentConfiguration(conf);
-    }
-
-    @Override
-    public void map(LongWritable key, BytesWritable value, Context context) throws IOException {
-        try {
-            outKey.set(Bytes.toBytes(key.get()));
-            outValue.set(value.getBytes(), 0, value.getLength());
-            context.write(outKey, outValue);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
deleted file mode 100644
index 81f6bac..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.hadoop;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-
-/**
- * Convert Kafka topic to Hadoop InputFormat
- * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
- */
-public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
-
-    @Override
-    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-        Configuration conf = context.getConfiguration();
-
-        String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
-        String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
-        String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
-        Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
-        Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
-
-        Map<Integer, Long> startOffsetMap = Maps.newHashMap();
-        Map<Integer, Long> endOffsetMap = Maps.newHashMap();
-        for (int i = partitionMin; i <= partitionMax; i++) {
-            String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i);
-            String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i);
-            if (start != null && end != null) {
-                startOffsetMap.put(i, Long.valueOf(start));
-                endOffsetMap.put(i, Long.valueOf(end));
-            }
-        }
-
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
-            List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
-            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
-            for (int i = 0; i < partitionInfos.size(); i++) {
-                PartitionInfo partition = partitionInfos.get(i);
-                int partitionId = partition.partition();
-                if (startOffsetMap.containsKey(partitionId) == false) {
-                    throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
-                }
-
-                if (endOffsetMap.get(partitionId) >  startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(
-                            brokers, inputTopic,
-                            partitionId,
-                            startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)
-                    );
-                    splits.add(split);
-                }
-            }
-        }
-        return splits;
-    }
-
-    @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(
-            InputSplit arg0, TaskAttemptContext arg1) throws IOException,
-            InterruptedException {
-        return new KafkaInputRecordReader();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
deleted file mode 100644
index f67fef5..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.hadoop;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kylin.common.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Convert Kafka topic to Hadoop InputFormat
- * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
- */
-public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> {
-
-    static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
-
-    private Configuration conf;
-
-    private KafkaInputSplit split;
-    private Consumer consumer;
-    private String brokers;
-    private String topic;
-
-    private int partition;
-    private long earliestOffset;
-    private long watermark;
-    private long latestOffset;
-
-    private ConsumerRecords<String, String> messages;
-    private Iterator<ConsumerRecord<String, String>> iterator;
-    private LongWritable key;
-    private BytesWritable value;
-
-    private long timeOut = 60000;
-    private long bufferSize = 65536;
-
-    private long numProcessedMessages = 0L;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-        initialize(split, context.getConfiguration());
-    }
-
-    public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException {
-        this.conf = conf;
-        this.split = (KafkaInputSplit) split;
-        brokers = this.split.getBrokers();
-        topic = this.split.getTopic();
-        partition = this.split.getPartition();
-        watermark = this.split.getOffsetStart();
-
-        if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
-            timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
-        }
-        if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
-            bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
-        }
-
-        String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
-        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
-
-        earliestOffset = this.split.getOffsetStart();
-        latestOffset = this.split.getOffsetEnd();
-        TopicPartition topicPartition = new TopicPartition(topic, partition);
-        consumer.assign(Arrays.asList(topicPartition));
-        log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset });
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-        if (key == null) {
-            key = new LongWritable();
-        }
-        if (value == null) {
-            value = new BytesWritable();
-        }
-
-        if (messages == null) {
-            log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
-            TopicPartition topicPartition = new TopicPartition(topic, partition);
-            consumer.seek(topicPartition, watermark);
-            messages = consumer.poll(timeOut);
-            iterator = messages.iterator();
-            if (!iterator.hasNext()) {
-                log.info("No more messages, stop");
-                throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
-            }
-        }
-
-        if (iterator.hasNext()) {
-            ConsumerRecord<String, String> message = iterator.next();
-            if (message.offset() >= latestOffset) {
-                log.info("Reach the end offset, stop reading.");
-                return false;
-            }
-            key.set(message.offset());
-            byte[] valuebytes = Bytes.toBytes(message.value());
-            value.set(valuebytes, 0, valuebytes.length);
-            watermark = message.offset() + 1;
-            numProcessedMessages++;
-            if (!iterator.hasNext()) {
-                messages = null;
-                iterator = null;
-            }
-            return true;
-        }
-
-        log.error("Unexpected iterator end.");
-        throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark));
-    }
-
-    @Override
-    public LongWritable getCurrentKey() throws IOException, InterruptedException {
-        return key;
-    }
-
-    @Override
-    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
-        return value;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-        if (watermark >= latestOffset || earliestOffset == latestOffset) {
-            return 1.0f;
-        }
-        return Math.min(1.0f, (watermark - earliestOffset) / (float) (latestOffset - earliestOffset));
-    }
-
-    @Override
-    public void close() throws IOException {
-        log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages);
-        consumer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
deleted file mode 100644
index 3261399..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * Convert Kafka topic to Hadoop InputFormat
- * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
- */
-public class KafkaInputSplit extends InputSplit implements Writable {
-
-    private String brokers;
-    private String topic;
-    private int partition;
-    private long offsetStart;
-    private long offsetEnd;
-
-    public KafkaInputSplit() {
-    }
-
-    public KafkaInputSplit(String brokers, String topic, int partition, long offsetStart, long offsetEnd) {
-        this.brokers = brokers;
-        this.topic = topic;
-        this.partition = partition;
-        this.offsetStart = offsetStart;
-        this.offsetEnd = offsetEnd;
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        brokers = Text.readString(in);
-        topic = Text.readString(in);
-        partition = in.readInt();
-        offsetStart = in.readLong();
-        offsetEnd = in.readLong();
-    }
-
-    public void write(DataOutput out) throws IOException {
-        Text.writeString(out, brokers);
-        Text.writeString(out, topic);
-        out.writeInt(partition);
-        out.writeLong(offsetStart);
-        out.writeLong(offsetEnd);
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-        return Long.MAX_VALUE;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[]{brokers};
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public String getBrokers() {
-        return brokers;
-    }
-
-    public long getOffsetStart() {
-        return offsetStart;
-    }
-
-    public long getOffsetEnd() {
-        return offsetEnd;
-    }
-
-    @Override
-    public String toString() {
-        return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
deleted file mode 100644
index 640cc53..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka.util;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- */
-public class KafkaClient {
-
-    public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
-        Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-        return consumer;
-    }
-
-    public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
-        Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-        return producer;
-    }
-
-    private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokers);
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("acks", "1");
-        props.put("buffer.memory", 33554432);
-        props.put("retries", 0);
-        props.put("batch.size", 16384);
-        props.put("linger.ms", 50);
-        props.put("timeout.ms", "30000");
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return props;
-    }
-
-    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokers);
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("group.id", consumerGroup);
-        props.put("session.timeout.ms", "30000");
-        props.put("enable.auto.commit", "false");
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return props;
-    }
-
-    public static String getKafkaBrokers(KafkaConfig kafkaConfig) {
-        String brokers = null;
-        for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-            for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
-                if (brokers == null) {
-                    brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort();
-                } else {
-                    brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort();
-                }
-            }
-        }
-        return brokers;
-    }
-
-    public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) {
-
-        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
-        consumer.assign(Arrays.asList(topicPartition));
-        consumer.seekToBeginning(Arrays.asList(topicPartition));
-
-        return consumer.position(topicPartition);
-    }
-
-    public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) {
-
-        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
-        consumer.assign(Arrays.asList(topicPartition));
-        consumer.seekToEnd(Arrays.asList(topicPartition));
-
-        return consumer.position(topicPartition);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
deleted file mode 100644
index b46e57f..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka.util;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.cube.CubeSegment;
-
-import java.util.Map;
-
-/**
- */
-public class KafkaOffsetMapping {
-
-    public static final String OFFSET_START = "kafka.offset.start.";
-    public static final String OFFSET_END = "kafka.offset.end.";
-
-    /**
-     * Get the start offsets for each partition from a segment
-     *
-     * @param segment
-     * @return
-     */
-    public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) {
-        return parseOffset(segment, OFFSET_START);
-    }
-
-    /**
-     * Get the end offsets for each partition from a segment
-     *
-     * @param segment
-     * @return
-     */
-    public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) {
-        return parseOffset(segment, OFFSET_END);
-    }
-
-    /**
-     * Save the partition start offset to cube segment
-     *
-     * @param segment
-     * @param offsetStart
-     */
-    public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) {
-        long sourceOffsetStart = 0;
-        for (Integer partition : offsetStart.keySet()) {
-            segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition)));
-            sourceOffsetStart += offsetStart.get(partition);
-        }
-
-        segment.setSourceOffsetStart(sourceOffsetStart);
-    }
-
-    /**
-     * Save the partition end offset to cube segment
-     *
-     * @param segment
-     * @param offsetEnd
-     */
-    public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) {
-        long sourceOffsetEnd = 0;
-        for (Integer partition : offsetEnd.keySet()) {
-            segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition)));
-            sourceOffsetEnd += offsetEnd.get(partition);
-        }
-
-        segment.setSourceOffsetEnd(sourceOffsetEnd);
-    }
-
-    private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) {
-        final Map<Integer, Long> offsetStartMap = Maps.newHashMap();
-        for (String key : segment.getAdditionalInfo().keySet()) {
-            if (key.startsWith(propertyPrefix)) {
-                Integer partition = Integer.valueOf(key.substring(propertyPrefix.length()));
-                Long offset = Long.valueOf(segment.getAdditionalInfo().get(key));
-                offsetStartMap.put(partition, offset);
-            }
-        }
-
-
-        return offsetStartMap;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
index ddc2eb7..58cba7d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
@@ -1,20 +1,37 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
 package org.apache.kylin.source.kafka.util;
 
 import java.util.Collections;
@@ -25,8 +42,6 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.annotation.Nullable;
 
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kylin.source.kafka.TopicMeta;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.slf4j.Logger;
@@ -71,14 +86,13 @@ public final class KafkaRequester {
         if (consumerCache.containsKey(key)) {
             return consumerCache.get(key);
         } else {
-            BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
-            consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
+            consumerCache.putIfAbsent(key, new SimpleConsumer(broker.host(), broker.port(), timeout, bufferSize, clientId));
             return consumerCache.get(key);
         }
     }
 
     private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
-        return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
+        return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
     }
 
     public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index ee5bb20..24eaa05 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.source.kafka.StreamingParser;
@@ -56,7 +55,7 @@ public final class KafkaUtils {
             if (partitionMetadata.errorCode() != 0) {
                 logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
             }
-            return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
+            return partitionMetadata.leader();
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index f285153..c7de287 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -39,7 +39,7 @@ import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LoggableCachedThreadPool;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index da087c9..c318cba 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -31,7 +31,7 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
 import org.apache.kylin.cube.kv.FuzzyMaskEncoder;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 5692000..f1e5dab 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.ShardingHash;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dimension.DimensionEncoding;