You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/10 05:37:59 UTC

[2/6] kylin git commit: KYLIN-2072 Cleanup old streaming code

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
deleted file mode 100644
index 1c579c6..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TimeHistogram {
-    private long[] bucketsBoundary;
-    private AtomicLong[] counters;
-    private String id;
-
-    private static Object printLock = new Object();
-
-    /**
-     * example: [10,20] will generate three  buckets: (-\u221e,10), [10,20),[20,+\u221e)
-     * unit: second
-     */
-    public TimeHistogram(long[] bucketsBoundary, String id) {
-        this.bucketsBoundary = bucketsBoundary;
-        this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
-        for (int i = 0; i < counters.length; i++) {
-            this.counters[i] = new AtomicLong();
-        }
-        this.id = id;
-    }
-
-    /**
-     * @param second in seconds
-     */
-    public void process(long second) {
-        for (int i = 0; i < bucketsBoundary.length; ++i) {
-            if (second < bucketsBoundary[i]) {
-                counters[i].incrementAndGet();
-                return;
-            }
-        }
-
-        counters[bucketsBoundary.length].incrementAndGet();
-    }
-
-    /**
-     * @param millis in milli seconds
-     */
-    public void processMillis(long millis) {
-        process(millis / 1000);
-    }
-
-    public void printStatus() {
-        long[] countersSnapshot = new long[counters.length];
-        for (int i = 0; i < countersSnapshot.length; i++) {
-            countersSnapshot[i] = counters[i].get();
-        }
-
-        long sum = 0;
-        for (long counter : countersSnapshot) {
-            sum += counter;
-        }
-
-        synchronized (printLock) {
-            System.out.println("============== status of TimeHistogram " + id + " =================");
-
-            for (int i = 0; i < countersSnapshot.length; ++i) {
-                System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
-            }
-
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..7a42598
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class ByteBufferBackedInputStream extends InputStream {
+
+    private ByteBuffer buf;
+
+    public ByteBufferBackedInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len) throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
deleted file mode 100644
index bce9bb9..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.source.kafka.TopicMeta;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.TopicMetadataResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- */
-public final class KafkaRequester {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
-
-    private static ConcurrentMap<String, SimpleConsumer> consumerCache = Maps.newConcurrentMap();
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KafkaRequester.shutdown();
-            }
-        }));
-    }
-
-    private static SimpleConsumer getSimpleConsumer(Broker broker, int timeout, int bufferSize, String clientId) {
-        String key = createKey(broker, timeout, bufferSize, clientId);
-        if (consumerCache.containsKey(key)) {
-            return consumerCache.get(key);
-        } else {
-            BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
-            consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
-            return consumerCache.get(key);
-        }
-    }
-
-    private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
-        return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
-    }
-
-    public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : kafkaClusterConfig.getBrokers()) {
-            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp;
-            try {
-                resp = consumer.send(req);
-            } catch (Exception e) {
-                logger.warn("cannot send TopicMetadataRequest successfully: " + e);
-                continue;
-            }
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                break;
-            }
-            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
-                @Nullable
-                @Override
-                public Integer apply(PartitionMetadata partitionMetadata) {
-                    return partitionMetadata.partitionId();
-                }
-            });
-            return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
-        }
-        logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
-        return null;
-    }
-
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
-        logger.debug("Brokers: " + brokers.toString());
-        SimpleConsumer consumer;
-        for (Broker broker : brokers) {
-            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topic);
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp;
-            try {
-                resp = consumer.send(req);
-            } catch (Exception e) {
-                logger.warn("cannot send TopicMetadataRequest successfully: " + e);
-                continue;
-            }
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
-                break;
-            }
-            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
-                StringBuffer logText = new StringBuffer();
-                logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode());
-                logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId());
-                logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader());
-                logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr());
-                logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas());
-                logger.info(logText.toString());
-                if (partitionMetadata.partitionId() == partitionId) {
-                    return partitionMetadata;
-                }
-            }
-        }
-        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
-        return null;
-    }
-
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
-        final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
-                .build();
-        return consumer.fetch(req);
-    }
-
-    public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
-        String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-
-        if (response.hasError()) {
-            logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
-            return 0;
-        }
-        long[] offsets = response.offsets(topic, partitionId);
-        return offsets[0];
-    }
-
-    public static void shutdown() {
-        for (SimpleConsumer simpleConsumer : consumerCache.values()) {
-            simpleConsumer.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
deleted file mode 100644
index ee5bb20..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.util;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-/**
- */
-public final class KafkaUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
-
-    private static final int MAX_RETRY_TIMES = 6;
-
-    private KafkaUtils() {
-    }
-
-    public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-        if (partitionMetadata != null) {
-            if (partitionMetadata.errorCode() != 0) {
-                logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
-            }
-            return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
-        } else {
-            return null;
-        }
-    }
-
-    private static void sleep(int retryTimes) {
-        int seconds = (int) Math.pow(2, retryTimes);
-        logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
-        try {
-            Thread.sleep(seconds * 1000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
-        final String topic = kafkaClusterConfig.getTopic();
-        int retry = 0;
-        while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
-            final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-            if (leadBroker == null) {
-                logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
-                sleep(retry++);
-                continue;
-            }
-            final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
-            if (response.errorCode(topic, partitionId) != 0) {
-                logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
-                sleep(retry++);
-                continue;
-            }
-            final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
-            if (!iterator.hasNext()) {
-                logger.warn("messageSet is empty");
-                sleep(retry++);
-                continue;
-            }
-            return iterator.next();
-        }
-        throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
-    }
-
-    public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamingParser streamingParser) {
-        Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
-        final String topic = kafkaClusterConfig.getTopic();
-
-        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
-        final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamingParser);
-        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
-        return result;
-    }
-
-    public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
-        final String topic = kafkaClusterConfig.getTopic();
-        final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
-        final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
-        final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
-        return Pair.newPair(earliestOffset, latestOffset);
-    }
-
-    private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamingParser streamingParser) {
-        Map<Long, Long> cache = Maps.newHashMap();
-
-        while (startOffset < endOffset) {
-            long midOffset = startOffset + ((endOffset - startOffset) >> 1);
-            long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamingParser, cache);
-            long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamingParser, cache);
-            long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamingParser, cache);
-            // hard to ensure these 2 conditions
-            //            Preconditions.checkArgument(startTimestamp <= midTimestamp);
-            //            Preconditions.checkArgument(midTimestamp <= endTimestamp);
-            if (startTimestamp >= targetTimestamp) {
-                return startOffset;
-            }
-            if (endTimestamp <= targetTimestamp) {
-                return endOffset;
-            }
-            if (targetTimestamp == midTimestamp) {
-                return midOffset;
-            } else if (targetTimestamp < midTimestamp) {
-                endOffset = midOffset - 1;
-                continue;
-            } else {
-                startOffset = midOffset + 1;
-                continue;
-            }
-        }
-        return startOffset;
-    }
-
-    private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser, Map<Long, Long> cache) {
-        if (cache.containsKey(offset)) {
-            return cache.get(offset);
-        } else {
-            long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamingParser);
-            cache.put(offset, t);
-            return t;
-        }
-    }
-
-    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser) {
-        final String topic = kafkaClusterConfig.getTopic();
-        final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
-        final ByteBuffer payload = messageAndOffset.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-        streamingMessage.setOffset(messageAndOffset.offset());
-        logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
-        return streamingMessage.getTimestamp();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
index fb33059..5a52b61 100644
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -20,7 +20,8 @@ import com.fasterxml.jackson.databind.JavaType;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import com.fasterxml.jackson.databind.type.MapType;
@@ -32,7 +33,6 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 1d33071..23e7239 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,10 +41,6 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
deleted file mode 100644
index 9adaf24..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HBaseStreamingOutput implements IStreamingOutput {
-
-    private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
-
-    @Override
-    public ICuboidWriter getCuboidWriter(IBuildable buildable) {
-        try {
-            CubeSegment cubeSegment = (CubeSegment) buildable;
-
-            final HTableInterface hTable;
-            hTable = createHTable(cubeSegment);
-            List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
-            cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
-            cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
-            return new CompoundCuboidWriter(cuboidWriters);
-        } catch (IOException e) {
-            throw new RuntimeException("failed to get ICuboidWriter", e);
-        }
-    }
-
-    @Override
-    public void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult) {
-        try {
-            CubeSegment cubeSegment = (CubeSegment) buildable;
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            final Configuration conf = HadoopUtil.getCurrentConfiguration();
-            final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
-            CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
-            FSDataInputStream inputStream = null;
-            try {
-                inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
-                ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
-            } finally {
-                IOUtils.closeQuietly(inputStream);
-                FileSystem.getLocal(conf).delete(outputPath, true);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("failed to write sampling result", e);
-        }
-    }
-
-    private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException {
-        final String hTableName = cubeSegment.getStorageLocationIdentifier();
-        CubeHTableUtil.createHTable(cubeSegment, null);
-        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
-        return hTable;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index f4fb308..9cb135a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -35,8 +35,8 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.PersistentException;