You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/10 05:37:58 UTC

[1/6] kylin git commit: KYLIN-2055 Add an encoder for Boolean type

Repository: kylin
Updated Branches:
  refs/heads/master 71f373507 -> ed643e6b2


KYLIN-2055 Add an encoder for Boolean type


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cb2b12b3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cb2b12b3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cb2b12b3

Branch: refs/heads/master
Commit: cb2b12b3b619ac86efbb9c7ca708418882683daf
Parents: 71f3735
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 10 13:30:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:30:27 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/dimension/BooleanDimEnc.java   | 196 +++++++++++++++++++
 .../dimension/DimensionEncodingFactory.java     |   1 +
 .../kylin/dimension/BooleanDimEncTest.java      |  95 +++++++++
 .../cubeDesigner/advanced_settings.html         |   2 +-
 4 files changed, 293 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
new file mode 100644
index 0000000..f32724c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dimension;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * Encoding Boolean values to bytes
+ */
+public class BooleanDimEnc extends DimensionEncoding {
+    private static final long serialVersionUID = 1L;
+
+    public static final String ENCODING_NAME = "boolean";
+
+    //NOTE: when add new value, append to the array tail, DO NOT insert!
+    public static String[] ALLOWED_VALUES = new String[] { "", "true", "false", "TRUE", "FALSE", "True", "False", "t", "f", "T", "F", "yes", "no", "YES", "NO", "Yes", "No", "y", "n", "Y", "N", "1", "0" };
+
+    public static final Map<String, Integer> map = Maps.newHashMap();
+
+    static {
+        for (int i = 0; i < ALLOWED_VALUES.length; i++) {
+            map.put(ALLOWED_VALUES[i], i);
+        }
+    }
+
+    public static class Factory extends DimensionEncodingFactory {
+        @Override
+        public String getSupportedEncodingName() {
+            return ENCODING_NAME;
+        }
+
+        @Override
+        public DimensionEncoding createDimensionEncoding(String encodingName, String[] args) {
+            return new BooleanDimEnc();
+        }
+    };
+
+    // ============================================================================
+
+    private static int fixedLen = 1;
+
+    //no-arg constructor is required for Externalizable
+    public BooleanDimEnc() {
+    }
+
+    @Override
+    public int getLengthOfEncoding() {
+        return fixedLen;
+    }
+
+    @Override
+    public void encode(byte[] value, int valueLen, byte[] output, int outputOffset) {
+        if (value == null) {
+            Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL);
+            return;
+        }
+
+        encode(Bytes.toString(value, 0, valueLen), output, outputOffset);
+    }
+
+    void encode(String valueStr, byte[] output, int outputOffset) {
+        if (valueStr == null) {
+            Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL);
+            return;
+        }
+
+        Integer encodeValue = map.get(valueStr);
+        if (encodeValue == null) {
+            throw new IllegalArgumentException("Value '" + valueStr + "' is not a recognized boolean value.");
+        }
+
+        BytesUtil.writeLong(encodeValue, output, outputOffset, fixedLen);
+    }
+
+    @Override
+    public String decode(byte[] bytes, int offset, int len) {
+        if (isNull(bytes, offset, len)) {
+            return null;
+        }
+
+        int x = (int) BytesUtil.readLong(bytes, offset, len);
+        if (x >= ALLOWED_VALUES.length) {
+            throw new IllegalStateException();
+        }
+
+        return ALLOWED_VALUES[x];
+    }
+
+    @Override
+    public DataTypeSerializer<Object> asDataTypeSerializer() {
+        return new BooleanSerializer();
+    }
+
+    private class BooleanSerializer extends DataTypeSerializer<Object> {
+        // be thread-safe and avoid repeated obj creation
+        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
+
+        private byte[] currentBuf() {
+            byte[] buf = current.get();
+            if (buf == null) {
+                buf = new byte[fixedLen];
+                current.set(buf);
+            }
+            return buf;
+        }
+
+        @Override
+        public void serialize(Object value, ByteBuffer out) {
+            byte[] buf = currentBuf();
+            String valueStr = value == null ? null : value.toString();
+            encode(valueStr, buf, 0);
+            out.put(buf);
+        }
+
+        @Override
+        public Object deserialize(ByteBuffer in) {
+            byte[] buf = currentBuf();
+            in.get(buf);
+            return decode(buf, 0, buf.length);
+        }
+
+        @Override
+        public int peekLength(ByteBuffer in) {
+            return fixedLen;
+        }
+
+        @Override
+        public int maxLength() {
+            return fixedLen;
+        }
+
+        @Override
+        public int getStorageBytesEstimate() {
+            return fixedLen;
+        }
+
+        @Override
+        public Object valueOf(String str) {
+            return str;
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(fixedLen);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        fixedLen = in.readShort();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        BooleanDimEnc that = (BooleanDimEnc) o;
+
+        return fixedLen == that.fixedLen;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return fixedLen;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 27bebd7..aba0c26 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -79,6 +79,7 @@ public abstract class DimensionEncodingFactory {
             map.put(FixedLenHexDimEnc.ENCODING_NAME, new FixedLenHexDimEnc.Factory());
             map.put(DateDimEnc.ENCODING_NAME, new DateDimEnc.Factory());
             map.put(TimeDimEnc.ENCODING_NAME, new TimeDimEnc.Factory());
+            map.put(BooleanDimEnc.ENCODING_NAME, new BooleanDimEnc.Factory());
 
             // custom encodings
             String[] clsNames = KylinConfig.getInstanceFromEnv().getCubeDimensionCustomEncodingFactories();

http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java b/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
new file mode 100644
index 0000000..c6c1416
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dimension;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BooleanDimEncTest {
+
+    @Test
+    public void testNull() {
+        BooleanDimEnc enc = new BooleanDimEnc();
+
+        byte[] buf = new byte[enc.getLengthOfEncoding()];
+        enc.encode(null, 0, buf, 0);
+        Assert.assertTrue(DimensionEncoding.isNull(buf, 0, buf.length));
+        String decode = enc.decode(buf, 0, buf.length);
+        Assert.assertEquals(null, decode);
+
+        buf = new byte[enc.getLengthOfEncoding()];
+        DataTypeSerializer<Object> ser = enc.asDataTypeSerializer();
+        ser.serialize(null, ByteBuffer.wrap(buf));
+        Assert.assertTrue(DimensionEncoding.isNull(buf, 0, buf.length));
+        decode = (String) ser.deserialize(ByteBuffer.wrap(buf));
+        Assert.assertEquals(null, decode);
+    }
+
+    @Test
+    public void testEncodeDecode() {
+        BooleanDimEnc enc = new BooleanDimEnc();
+
+        for (String x : BooleanDimEnc.ALLOWED_VALUES) {
+            testEncodeDecode(enc, x);
+        }
+
+        try {
+            testEncodeDecode(enc, "FAlse");
+            Assert.fail();
+        } catch (Throwable e) {
+            Assert.assertEquals("Value 'FAlse' is not a recognized boolean value.", e.getMessage());
+        }
+    }
+
+    private void testEncodeDecode(BooleanDimEnc enc, String valueStr) {
+        byte[] buf = new byte[enc.getLengthOfEncoding()];
+        byte[] bytes = Bytes.toBytes(valueStr);
+        enc.encode(bytes, bytes.length, buf, 0);
+        String decode = enc.decode(buf, 0, buf.length);
+        Assert.assertEquals(valueStr, decode);
+    }
+
+    @Test
+    public void testSerDes() {
+        BooleanDimEnc enc = new BooleanDimEnc();
+        for (String x : BooleanDimEnc.ALLOWED_VALUES) {
+            testSerDes(enc, x);
+        }
+
+        try {
+            testSerDes(enc, "FAlse");
+            Assert.fail();
+        } catch (Throwable e) {
+            Assert.assertEquals("Value 'FAlse' is not a recognized boolean value.", e.getMessage());
+        }
+    }
+
+    private void testSerDes(BooleanDimEnc enc, String valueStr) {
+        DataTypeSerializer<Object> ser = enc.asDataTypeSerializer();
+        byte[] buf = new byte[enc.getLengthOfEncoding()];
+        ser.serialize(valueStr, ByteBuffer.wrap(buf));
+        String decode = (String) ser.deserialize(ByteBuffer.wrap(buf));
+        Assert.assertEquals(valueStr, decode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/webapp/app/partials/cubeDesigner/advanced_settings.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index e8cbf9e..34fd492 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -244,7 +244,7 @@
                       <!--Column Length -->
                       <input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
                              tooltip="rowkey column length.." tooltip-trigger="focus"
-                             ng-disabled="rowkey_column.encoding=='dict'||rowkey_column.encoding=='date'||rowkey_column.encoding=='time'"
+                             ng-disabled="rowkey_column.encoding=='dict'||rowkey_column.encoding=='date'||rowkey_column.encoding=='time'||rowkey_column.encoding=='boolean'"
                              ng-change="refreshRowKey(convertedRowkeys,$index,rowkey_column);"
                              ng-model="rowkey_column.valueLength" class="form-control">
 


[2/6] kylin git commit: KYLIN-2072 Cleanup old streaming code

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
deleted file mode 100644
index 1c579c6..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TimeHistogram {
-    private long[] bucketsBoundary;
-    private AtomicLong[] counters;
-    private String id;
-
-    private static Object printLock = new Object();
-
-    /**
-     * example: [10,20] will generate three  buckets: (-\u221e,10), [10,20),[20,+\u221e)
-     * unit: second
-     */
-    public TimeHistogram(long[] bucketsBoundary, String id) {
-        this.bucketsBoundary = bucketsBoundary;
-        this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
-        for (int i = 0; i < counters.length; i++) {
-            this.counters[i] = new AtomicLong();
-        }
-        this.id = id;
-    }
-
-    /**
-     * @param second in seconds
-     */
-    public void process(long second) {
-        for (int i = 0; i < bucketsBoundary.length; ++i) {
-            if (second < bucketsBoundary[i]) {
-                counters[i].incrementAndGet();
-                return;
-            }
-        }
-
-        counters[bucketsBoundary.length].incrementAndGet();
-    }
-
-    /**
-     * @param millis in milli seconds
-     */
-    public void processMillis(long millis) {
-        process(millis / 1000);
-    }
-
-    public void printStatus() {
-        long[] countersSnapshot = new long[counters.length];
-        for (int i = 0; i < countersSnapshot.length; i++) {
-            countersSnapshot[i] = counters[i].get();
-        }
-
-        long sum = 0;
-        for (long counter : countersSnapshot) {
-            sum += counter;
-        }
-
-        synchronized (printLock) {
-            System.out.println("============== status of TimeHistogram " + id + " =================");
-
-            for (int i = 0; i < countersSnapshot.length; ++i) {
-                System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
-            }
-
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..7a42598
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class ByteBufferBackedInputStream extends InputStream {
+
+    private ByteBuffer buf;
+
+    public ByteBufferBackedInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len) throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
deleted file mode 100644
index bce9bb9..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.source.kafka.TopicMeta;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.TopicMetadataResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- */
-public final class KafkaRequester {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
-
-    private static ConcurrentMap<String, SimpleConsumer> consumerCache = Maps.newConcurrentMap();
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KafkaRequester.shutdown();
-            }
-        }));
-    }
-
-    private static SimpleConsumer getSimpleConsumer(Broker broker, int timeout, int bufferSize, String clientId) {
-        String key = createKey(broker, timeout, bufferSize, clientId);
-        if (consumerCache.containsKey(key)) {
-            return consumerCache.get(key);
-        } else {
-            BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
-            consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
-            return consumerCache.get(key);
-        }
-    }
-
-    private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
-        return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
-    }
-
-    public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
-        SimpleConsumer consumer;
-        for (Broker broker : kafkaClusterConfig.getBrokers()) {
-            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp;
-            try {
-                resp = consumer.send(req);
-            } catch (Exception e) {
-                logger.warn("cannot send TopicMetadataRequest successfully: " + e);
-                continue;
-            }
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                break;
-            }
-            List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
-                @Nullable
-                @Override
-                public Integer apply(PartitionMetadata partitionMetadata) {
-                    return partitionMetadata.partitionId();
-                }
-            });
-            return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
-        }
-        logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
-        return null;
-    }
-
-    public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
-        logger.debug("Brokers: " + brokers.toString());
-        SimpleConsumer consumer;
-        for (Broker broker : brokers) {
-            consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
-            List<String> topics = Collections.singletonList(topic);
-            TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp;
-            try {
-                resp = consumer.send(req);
-            } catch (Exception e) {
-                logger.warn("cannot send TopicMetadataRequest successfully: " + e);
-                continue;
-            }
-            final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
-            if (topicMetadatas.size() != 1) {
-                logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
-                break;
-            }
-            final TopicMetadata topicMetadata = topicMetadatas.get(0);
-            if (topicMetadata.errorCode() != 0) {
-                logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
-                break;
-            }
-            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
-                StringBuffer logText = new StringBuffer();
-                logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode());
-                logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId());
-                logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader());
-                logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr());
-                logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas());
-                logger.info(logText.toString());
-                if (partitionMetadata.partitionId() == partitionId) {
-                    return partitionMetadata;
-                }
-            }
-        }
-        logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
-        return null;
-    }
-
-    public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
-        final String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
-        kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
-                .build();
-        return consumer.fetch(req);
-    }
-
-    public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
-        String clientName = "client_" + topic + "_" + partitionId;
-        SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-
-        if (response.hasError()) {
-            logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
-            return 0;
-        }
-        long[] offsets = response.offsets(topic, partitionId);
-        return offsets[0];
-    }
-
-    public static void shutdown() {
-        for (SimpleConsumer simpleConsumer : consumerCache.values()) {
-            simpleConsumer.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
deleted file mode 100644
index ee5bb20..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.util;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-/**
- */
-public final class KafkaUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
-
-    private static final int MAX_RETRY_TIMES = 6;
-
-    private KafkaUtils() {
-    }
-
-    public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-        if (partitionMetadata != null) {
-            if (partitionMetadata.errorCode() != 0) {
-                logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
-            }
-            return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
-        } else {
-            return null;
-        }
-    }
-
-    private static void sleep(int retryTimes) {
-        int seconds = (int) Math.pow(2, retryTimes);
-        logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
-        try {
-            Thread.sleep(seconds * 1000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
-        final String topic = kafkaClusterConfig.getTopic();
-        int retry = 0;
-        while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
-            final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-            if (leadBroker == null) {
-                logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
-                sleep(retry++);
-                continue;
-            }
-            final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
-            if (response.errorCode(topic, partitionId) != 0) {
-                logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
-                sleep(retry++);
-                continue;
-            }
-            final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
-            if (!iterator.hasNext()) {
-                logger.warn("messageSet is empty");
-                sleep(retry++);
-                continue;
-            }
-            return iterator.next();
-        }
-        throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
-    }
-
-    public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamingParser streamingParser) {
-        Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
-        final String topic = kafkaClusterConfig.getTopic();
-
-        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
-        final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamingParser);
-        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
-        return result;
-    }
-
-    public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
-        final String topic = kafkaClusterConfig.getTopic();
-        final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
-        final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
-        final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
-        return Pair.newPair(earliestOffset, latestOffset);
-    }
-
-    private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamingParser streamingParser) {
-        Map<Long, Long> cache = Maps.newHashMap();
-
-        while (startOffset < endOffset) {
-            long midOffset = startOffset + ((endOffset - startOffset) >> 1);
-            long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamingParser, cache);
-            long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamingParser, cache);
-            long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamingParser, cache);
-            // hard to ensure these 2 conditions
-            //            Preconditions.checkArgument(startTimestamp <= midTimestamp);
-            //            Preconditions.checkArgument(midTimestamp <= endTimestamp);
-            if (startTimestamp >= targetTimestamp) {
-                return startOffset;
-            }
-            if (endTimestamp <= targetTimestamp) {
-                return endOffset;
-            }
-            if (targetTimestamp == midTimestamp) {
-                return midOffset;
-            } else if (targetTimestamp < midTimestamp) {
-                endOffset = midOffset - 1;
-                continue;
-            } else {
-                startOffset = midOffset + 1;
-                continue;
-            }
-        }
-        return startOffset;
-    }
-
-    private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser, Map<Long, Long> cache) {
-        if (cache.containsKey(offset)) {
-            return cache.get(offset);
-        } else {
-            long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamingParser);
-            cache.put(offset, t);
-            return t;
-        }
-    }
-
-    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser) {
-        final String topic = kafkaClusterConfig.getTopic();
-        final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
-        final ByteBuffer payload = messageAndOffset.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-        streamingMessage.setOffset(messageAndOffset.offset());
-        logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
-        return streamingMessage.getTimestamp();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
index fb33059..5a52b61 100644
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -20,7 +20,8 @@ import com.fasterxml.jackson.databind.JavaType;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import com.fasterxml.jackson.databind.type.MapType;
@@ -32,7 +33,6 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 1d33071..23e7239 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,10 +41,6 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
deleted file mode 100644
index 9adaf24..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HBaseStreamingOutput implements IStreamingOutput {
-
-    private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
-
-    @Override
-    public ICuboidWriter getCuboidWriter(IBuildable buildable) {
-        try {
-            CubeSegment cubeSegment = (CubeSegment) buildable;
-
-            final HTableInterface hTable;
-            hTable = createHTable(cubeSegment);
-            List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
-            cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
-            cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
-            return new CompoundCuboidWriter(cuboidWriters);
-        } catch (IOException e) {
-            throw new RuntimeException("failed to get ICuboidWriter", e);
-        }
-    }
-
-    @Override
-    public void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult) {
-        try {
-            CubeSegment cubeSegment = (CubeSegment) buildable;
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            final Configuration conf = HadoopUtil.getCurrentConfiguration();
-            final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
-            CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
-            FSDataInputStream inputStream = null;
-            try {
-                inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
-                ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
-            } finally {
-                IOUtils.closeQuietly(inputStream);
-                FileSystem.getLocal(conf).delete(outputPath, true);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("failed to write sampling result", e);
-        }
-    }
-
-    private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException {
-        final String hTableName = cubeSegment.getStorageLocationIdentifier();
-        CubeHTableUtil.createHTable(cubeSegment, null);
-        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
-        return hTable;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index f4fb308..9cb135a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -35,8 +35,8 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.PersistentException;


[3/6] kylin git commit: KYLIN-2072 Cleanup old streaming code

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
deleted file mode 100644
index 271bf41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
-import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class StreamingManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
-
-    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
-    private KylinConfig config;
-
-    // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
-    private StreamingManager(KylinConfig config) throws IOException {
-        this.config = config;
-        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
-        
-        // touch lower level metadata before registering my listener
-        reloadAllStreaming();
-        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
-    }
-
-    private class StreamingSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
-
-        @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
-            if (event == Event.DROP)
-                removeStreamingLocal(cacheKey);
-            else
-                reloadStreamingConfigLocal(cacheKey);
-        }
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(this.config);
-    }
-
-    public static StreamingManager getInstance(KylinConfig config) {
-        StreamingManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (StreamingManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new StreamingManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
-            }
-        }
-    }
-
-    private static String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private static String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
-    }
-
-    private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
-    }
-
-    public StreamingConfig getStreamingConfig(String name) {
-        return streamingMap.get(name);
-    }
-
-    public List<StreamingConfig> listAllStreaming() {
-        return new ArrayList<>(streamingMap.values());
-    }
-
-    /**
-     * Reload StreamingConfig from resource store It will be triggered by an desc
-     * update event.
-     *
-     * @param name
-     * @throws IOException
-     */
-    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
-
-        // Save Source
-        String path = StreamingConfig.concatResourcePath(name);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-
-        // Here replace the old one
-        streamingMap.putLocal(ndesc.getName(), ndesc);
-        return ndesc;
-    }
-
-    // remove streamingConfig
-    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
-        String path = streamingConfig.getResourcePath();
-        getStore().deleteResource(path);
-        streamingMap.remove(streamingConfig.getName());
-    }
-
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
-    public void removeStreamingLocal(String streamingName) {
-        streamingMap.removeLocal(streamingName);
-    }
-
-    /**
-     * Update CubeDesc with the input. Broadcast the event into cluster
-     *
-     * @param desc
-     * @return
-     * @throws IOException
-     */
-    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
-        // Validate CubeDesc
-        if (desc.getUuid() == null || desc.getName() == null) {
-            throw new IllegalArgumentException("SteamingConfig Illegal.");
-        }
-        String name = desc.getName();
-        if (!streamingMap.containsKey(name)) {
-            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
-        }
-
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, STREAMING_SERIALIZER);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-        // Here replace the old one
-        streamingMap.put(ndesc.getName(), desc);
-
-        return ndesc;
-    }
-
-    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
-        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
-            throw new IllegalArgumentException();
-        }
-
-        if (streamingMap.containsKey(streamingConfig.getName()))
-            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
-
-        String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
-        getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
-        streamingMap.put(streamingConfig.getName(), streamingConfig);
-        return streamingConfig;
-    }
-
-    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
-        ResourceStore store = getStore();
-        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
-
-        if (StringUtils.isBlank(streamingDesc.getName())) {
-            throw new IllegalStateException("StreamingConfig name must not be blank");
-        }
-        return streamingDesc;
-    }
-
-    private void reloadAllStreaming() throws IOException {
-        ResourceStore store = getStore();
-        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
-
-        streamingMap.clear();
-
-        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
-        for (String path : paths) {
-            StreamingConfig streamingConfig;
-            try {
-                streamingConfig = loadStreamingConfigAt(path);
-            } catch (Exception e) {
-                logger.error("Error loading streaming desc " + path, e);
-                continue;
-            }
-            if (path.equals(streamingConfig.getResourcePath()) == false) {
-                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
-                continue;
-            }
-            if (streamingMap.containsKey(streamingConfig.getName())) {
-                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
-                continue;
-            }
-
-            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
-        }
-
-        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
deleted file mode 100644
index 32030ad..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName, host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
deleted file mode 100644
index 1d66b41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            Preconditions.checkArgument(args[0].equals("streaming"));
-            Preconditions.checkArgument(args[1].equals("start"));
-
-            int i = 2;
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            while (i < args.length) {
-                String argName = args[i];
-                switch (argName) {
-                case "-start":
-                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                    break;
-                case "-end":
-                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                    break;
-                case "-cube":
-                    bootstrapConfig.setCubeName(args[++i]);
-                    break;
-                case "-fillGap":
-                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
-                    break;
-                case "-maxFillGapRange":
-                    bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
-                    break;
-                default:
-                    logger.warn("ignore this arg:" + argName);
-                }
-                i++;
-            }
-            if (bootstrapConfig.isFillGap()) {
-                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
-                logger.info("all gaps:" + StringUtils.join(gaps, ","));
-                for (Pair<Long, Long> gap : gaps) {
-                    List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
-                    for (Pair<Long, Long> splitGap : splitGaps) {
-                        logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
-                        startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond());
-                        logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
-                    }
-                }
-            } else {
-                startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
-                logger.info("streaming process finished, exit with 0");
-                System.exit(0);
-            }
-        } catch (Exception e) {
-            printArgsError(args);
-            logger.error("error start streaming", e);
-            System.exit(-1);
-        }
-    }
-
-    private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        Long startTime = gap.getFirst();
-
-        while (startTime < gap.getSecond()) {
-            Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
-            gaps.add(Pair.newPair(startTime, endTime));
-            startTime = endTime;
-        }
-
-        return gaps;
-    }
-
-    private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
-        final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
-        runnable.run();
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
deleted file mode 100644
index 350a5f8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cube;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingCubeBuilder implements StreamingBatchBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class);
-
-    private final String cubeName;
-    private int processedRowCount = 0;
-
-    public StreamingCubeBuilder(String cubeName) {
-        this.cubeName = cubeName;
-    }
-
-    @Override
-    public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter) {
-        try {
-            CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-            final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-            
-            LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
-            InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
-            final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
-            processedRowCount = streamingBatch.getMessages().size();
-            for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
-                blockingQueue.put(streamingMessage.getData());
-            }
-            blockingQueue.put(Collections.<String> emptyList());
-            future.get();
-            cuboidWriter.flush();
-
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-        } catch (IOException e) {
-            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-        } finally {
-            try {
-                cuboidWriter.close();
-            } catch (IOException e) {
-                throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-            }
-        }
-    }
-
-    @Override
-    public IBuildable createBuildable(StreamingBatch streamingBatch) {
-        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        try {
-            CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
-            segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
-            segment.setInputRecords(streamingBatch.getMessages().size());
-            segment.setLastBuildTime(System.currentTimeMillis());
-            return segment;
-        } catch (IOException e) {
-            throw new RuntimeException("failed to create IBuildable", e);
-        }
-    }
-
-    @Override
-    public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
-        final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-        long start = System.currentTimeMillis();
-
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable StreamingMessage input) {
-                return input.getData();
-            }
-        }));
-        logger.info(String.format("sampling of %d messages cost %d ms", streamingBatch.getMessages().size(), (System.currentTimeMillis() - start)));
-        return samplingResult;
-    }
-
-    @Override
-    public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable) {
-        final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final Map<TblColRef, Dictionary<String>> dictionaryMap;
-        try {
-            dictionaryMap = CubingUtils.buildDictionary(cubeInstance, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-                @Nullable
-                @Override
-                public List<String> apply(@Nullable StreamingMessage input) {
-                    return input.getData();
-                }
-            }));
-            Map<TblColRef, Dictionary<String>> realDictMap = CubingUtils.writeDictionary((CubeSegment) buildable, dictionaryMap, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
-            return realDictMap;
-        } catch (IOException e) {
-            throw new RuntimeException("failed to build dictionary", e);
-        }
-    }
-
-    @Override
-    public void commit(IBuildable buildable) {
-        CubeSegment cubeSegment = (CubeSegment) buildable;
-        cubeSegment.setStatus(SegmentStatusEnum.READY);
-        cubeSegment.setInputRecords(processedRowCount);
-        CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
-        cubeBuilder.setToUpdateSegs(cubeSegment);
-        try {
-            CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder);
-        } catch (IOException e) {
-            throw new RuntimeException("failed to update CubeSegment", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
deleted file mode 100644
index fba664d..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.diagnose;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingLogAnalyzer {
-    public static void main(String[] args) {
-        int errorFileCount = 0;
-        List<Long> ellapsedTimes = Lists.newArrayList();
-
-        String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})";
-        Pattern pattern = Pattern.compile(patternStr);
-
-        SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
-
-        Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder");
-        for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) {
-            System.out.println("Processing file " + file.toString());
-
-            long startTime = 0;
-            long endTime = 0;
-            try {
-                List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset());
-                for (int i = 0; i < contents.size(); ++i) {
-                    Matcher m = pattern.matcher(contents.get(i));
-                    if (m.find()) {
-                        startTime = format.parse("20" + m.group(1)).getTime();
-                        break;
-                    }
-                }
-
-                for (int i = contents.size() - 1; i >= 0; --i) {
-                    Matcher m = pattern.matcher(contents.get(i));
-                    if (m.find()) {
-                        endTime = format.parse("20" + m.group(1)).getTime();
-                        break;
-                    }
-                }
-
-                if (startTime == 0 || endTime == 0) {
-                    throw new RuntimeException("start time or end time is not found");
-                }
-
-                if (endTime - startTime < 60000) {
-                    System.out.println("Warning: this job took less than one minute!!!! " + file.toString());
-                }
-
-                ellapsedTimes.add(endTime - startTime);
-
-            } catch (Exception e) {
-                System.out.println("Exception when processing log file " + file.toString());
-                System.out.println(e);
-                errorFileCount++;
-            }
-        }
-
-        System.out.println("Totally error files count " + errorFileCount);
-        System.out.println("Totally normal files processed " + ellapsedTimes.size());
-
-        long sum = 0;
-        for (Long x : ellapsedTimes) {
-            sum += x;
-        }
-        System.out.println("Avg build time " + (sum / ellapsedTimes.size()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
deleted file mode 100644
index 55252c4..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
-            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap<?, ?> hashMap = JsonUtil.readValue(msg, HashMap.class);
-                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String host) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String content) {
-        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
deleted file mode 100644
index 5790bc1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.util;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TODO: like MRUtil, use Factory pattern to allow config
- */
-public class StreamingUtils {
-
-    public static IStreamingInput getStreamingInput() {
-        return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
-    }
-
-    public static IStreamingOutput getStreamingOutput() {
-        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
-    }
-
-    public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
-        Preconditions.checkNotNull(realizationName);
-        if (realizationType == RealizationType.CUBE) {
-            return new StreamingCubeBuilder(realizationName);
-        } else {
-            throw new UnsupportedOperationException("not implemented yet");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c30abc0..a47fcde 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -44,8 +44,8 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09ef0e8..72e4069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,11 +225,6 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
-                <artifactId>kylin-engine-streaming</artifactId>
-                <version>${project.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-engine-spark</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -1017,7 +1012,6 @@
         <module>core-job</module>
         <module>core-storage</module>
         <module>engine-mr</module>
-        <module>engine-streaming</module>
         <module>engine-spark</module>
         <module>source-hive</module>
         <module>source-kafka</module>

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..a5fb874 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.rest.exception.BadRequestException;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index c4af5f4..34cc57f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..170c395 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e49e882..7310d9c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.springframework.beans.factory.annotation.Autowired;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index d4cdfd5..e2100c4 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -43,12 +43,6 @@
             <artifactId>kylin-core-common</artifactId>
         </dependency>
 
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
deleted file mode 100644
index 6981096..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- */
-class ByteBufferBackedInputStream extends InputStream {
-
-    private ByteBuffer buf;
-
-    public ByteBufferBackedInputStream(ByteBuffer buf) {
-        this.buf = buf;
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (!buf.hasRemaining()) {
-            return -1;
-        }
-        return buf.get() & 0xFF;
-    }
-
-    @Override
-    public int read(byte[] bytes, int off, int len) throws IOException {
-        if (!buf.hasRemaining()) {
-            return -1;
-        }
-
-        len = Math.min(len, buf.remaining());
-        buf.get(bytes, off, len);
-        return len;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index d039583..208c0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -20,7 +20,7 @@ package org.apache.kylin.source.kafka;
 
 import com.google.common.collect.Lists;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
deleted file mode 100644
index 78a67c2..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-import javax.annotation.Nullable;
-
-@SuppressWarnings("unused")
-public class KafkaStreamingInput implements IStreamingInput {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
-
-    @Override
-    public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
-        if (realizationType != RealizationType.CUBE) {
-            throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
-        }
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
-        final String streaming = cube.getFactTable();
-        final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
-        final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
-        if (streamingConfig == null) {
-            throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
-        }
-        if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
-            logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
-
-            try {
-                final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
-                final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-                List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
-
-                final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
-                final ExecutorService executorService = Executors.newCachedThreadPool();
-                final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
-                for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-
-                    final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-                    for (int i = 0; i < partitionCount; ++i) {
-                        final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
-                        final Future<List<StreamingMessage>> future = executorService.submit(producer);
-                        futures.add(future);
-                    }
-                }
-                List<StreamingMessage> messages = Lists.newLinkedList();
-                for (Future<List<StreamingMessage>> future : futures) {
-                    try {
-                        messages.addAll(future.get());
-                    } catch (InterruptedException e) {
-                        logger.warn("this thread should not be interrupted, just ignore", e);
-                        continue;
-                    } catch (ExecutionException e) {
-                        throw new RuntimeException("error when get StreamingMessages", e.getCause());
-                    }
-                }
-                final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
-                logger.info("finish to get streaming batch, total message count:" + messages.size());
-                return new StreamingBatch(messages, timeRange);
-            } catch (ReflectiveOperationException e) {
-                throw new RuntimeException("failed to create instance of StreamingParser", e);
-            }
-        } else {
-            throw new IllegalArgumentException("kafka is the only supported streaming type.");
-        }
-    }
-
-    private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> {
-
-        private final KafkaClusterConfig kafkaClusterConfig;
-        private final int partitionId;
-        private final StreamingParser streamingParser;
-        private final Pair<Long, Long> timeRange;
-        private final long margin;
-
-        private List<Broker> replicaBrokers;
-
-        StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int partitionId, Pair<Long, Long> timeRange, long margin, StreamingParser streamingParser) {
-            this.kafkaClusterConfig = kafkaClusterConfig;
-            this.partitionId = partitionId;
-            this.streamingParser = streamingParser;
-            this.margin = margin;
-            this.timeRange = timeRange;
-            this.replicaBrokers = kafkaClusterConfig.getBrokers();
-        }
-
-        private Broker getLeadBroker() {
-            final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
-            if (partitionMetadata != null) {
-                if (partitionMetadata.errorCode() != 0) {
-                    logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
-                }
-                replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
-                    @Nullable
-                    @Override
-                    public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
-                        return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
-                    }
-                });
-                BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
-                return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
-            } else {
-                return null;
-            }
-        }
-
-        @Override
-        public List<StreamingMessage> call() throws Exception {
-            List<StreamingMessage> result = Lists.newLinkedList();
-            try {
-                long startTimestamp = timeRange.getFirst() - margin;
-                long offset = KafkaUtils.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, partitionId, startTimestamp, streamingParser);
-                int fetchRound = 0;
-                int consumeMsgCount = 0;
-                Broker leadBroker = null;
-                String topic = kafkaClusterConfig.getTopic();
-                while (true) {
-                    boolean outOfMargin = false;
-                    int consumeMsgCountAtBeginning = consumeMsgCount;
-                    fetchRound++;
-
-                    if (leadBroker == null) {
-                        leadBroker = getLeadBroker();
-                    }
-
-                    if (leadBroker == null) {
-                        logger.warn("cannot find lead broker, wait 5s");
-                        Thread.sleep(5000);
-                        continue;
-                    }
-
-                    logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
-                    final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
-                    if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                        logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                        Thread.sleep(30000);
-                        continue;
-                    }
-
-                    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
-                        offset++;
-                        consumeMsgCount++;
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-                        streamingMessage.setOffset(messageAndOffset.offset());
-                        if (streamingParser.filter(streamingMessage)) {
-                            final long timestamp = streamingMessage.getTimestamp();
-                            if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {
-                                result.add(streamingMessage);
-                            } else if (timestamp < timeRange.getSecond() + margin) {
-                                //do nothing
-                            } else {
-                                logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + timeRange + " margin:" + margin);
-                                outOfMargin = true;
-                                break;
-                            }
-                        }
-                    }
-                    logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-                    if (outOfMargin) {
-                        break;
-                    }
-                    if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
-                        logger.info("no message consumed this round, wait 30s");
-                        Thread.sleep(30000);
-                    }
-                }
-            } catch (InterruptedException e) {
-                logger.warn("this thread should not be interrupted, just stop fetching", e);
-            }
-            return result;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e4c702d..633a30c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.util.ByteBufferBackedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
deleted file mode 100644
index b1b4011..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.message.MessageAndOffset;
-
-/**
- * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
- * This daemon only store a digest so it should not be space-consuming
- */
-public class KafkaInputAnalyzer extends AbstractApplication {
-
-    public class KafkaMessagePuller implements Runnable {
-
-        private final String topic;
-        private final int partitionId;
-        private final KafkaClusterConfig streamingConfig;
-        private final LinkedBlockingQueue<StreamingMessage> streamQueue;
-        private final StreamingParser streamingParser;
-        private final Broker leadBroker;
-        private long offset;
-
-        protected final Logger logger;
-
-        public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.streamingConfig = kafkaClusterConfig;
-            this.offset = startOffset;
-            this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId);
-            this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000);
-            this.streamingParser = streamingParser;
-            this.leadBroker = leadBroker;
-        }
-
-        public BlockingQueue<StreamingMessage> getStreamQueue() {
-            return streamQueue;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int consumeMsgCount = 0;
-                int fetchRound = 0;
-                while (true) {
-                    int consumeMsgCountAtBeginning = consumeMsgCount;
-                    fetchRound++;
-
-                    logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
-                    final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
-                    if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                        logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                        Thread.sleep(30000);
-                        continue;
-                    }
-
-                    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
-                        offset++;
-                        consumeMsgCount++;
-
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-                        streamingMessage.setOffset(messageAndOffset.offset());
-                        if (streamingParser.filter(streamingMessage)) {
-                            streamQueue.add(streamingMessage);
-                        }
-
-                    }
-                    logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-
-                    if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round 
-                        Thread.sleep(30000);
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("consumer has encountered an error", e);
-            }
-        }
-
-    }
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
-
-    private StreamingParser parser;
-    private KafkaConfig kafkaConfig;
-
-    private Options options;
-
-    public KafkaInputAnalyzer() {
-        options = new Options();
-        options.addOption(OPTION_STREAMING);
-        options.addOption(OPTION_TASK);
-        options.addOption(OPTION_TSCOLNAME);
-
-    }
-
-    private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
-        List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList();
-        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
-            final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId);
-            long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
-            logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
-            KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser);
-            Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
-            result.add(consumer.getStreamQueue());
-        }
-        return result;
-    }
-
-    private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) {
-        int clusterId = 0;
-        final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList();
-
-        for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-            final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
-            queues.addAll(oneClusterQueue);
-            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
-            clusterId++;
-        }
-        return queues;
-    }
-
-    private void analyzeLatency() throws InterruptedException {
-        long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
-        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
-        final List<TimeHistogram> allHistograms = Lists.newArrayList();
-        final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
-
-        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
-        for (int i = 0; i < allPartitionData.size(); ++i) {
-            final int index = i;
-            allHistograms.add(new TimeHistogram(intervals, "" + i));
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    while (true) {
-                        try {
-                            StreamingMessage message = allPartitionData.get(index).take();
-                            long t = message.getTimestamp();
-                            allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
-                            overallHistogram.processMillis(System.currentTimeMillis() - t);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            });
-        }
-
-        while (true) {
-            System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
-
-            for (TimeHistogram histogram : allHistograms) {
-                histogram.printStatus();
-            }
-            overallHistogram.printStatus();
-            Thread.sleep(300000);
-        }
-    }
-
-    private void analyzeDisorder() throws InterruptedException {
-        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
-
-        final List<Long> wallClocks = Lists.newArrayList();
-        final List<Long> wallOffset = Lists.newArrayList();
-        final List<Long> maxDisorderTime = Lists.newArrayList();
-        final List<Long> maxDisorderOffset = Lists.newArrayList();
-        final List<Long> processedMessages = Lists.newArrayList();
-
-        for (int i = 0; i < allPartitionData.size(); i++) {
-            wallClocks.add(0L);
-            wallOffset.add(0L);
-            maxDisorderTime.add(0L);
-            maxDisorderOffset.add(0L);
-            processedMessages.add(0L);
-        }
-
-        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
-        final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
-        for (int i = 0; i < allPartitionData.size(); ++i) {
-            final int index = i;
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        while (true) {
-                            StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
-                            if (message == null) {
-                                System.out.println(String.format("Thread %d is exiting", index));
-                                break;
-                            }
-                            long t = message.getTimestamp();
-                            long offset = message.getOffset();
-                            if (t < wallClocks.get(index)) {
-                                maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
-                                maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
-                            } else {
-                                wallClocks.set(index, t);
-                                wallOffset.set(index, offset);
-                            }
-                            processedMessages.set(index, processedMessages.get(index) + 1);
-
-                            if (processedMessages.get(index) % 10000 == 1) {
-                                System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", //
-                                        index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
-                            }
-                        }
-
-                        System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", //
-                                index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
-                        countDownLatch.countDown();
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        countDownLatch.await();
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-
-        String streaming = optionsHelper.getOptionValue(OPTION_STREAMING);
-        String task = optionsHelper.getOptionValue(OPTION_TASK);
-        String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
-
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
-        kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
-        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
-
-        if ("disorder".equalsIgnoreCase(task)) {
-            analyzeDisorder();
-        } else if ("delay".equalsIgnoreCase(task)) {
-            analyzeLatency();
-        } else {
-            optionsHelper.printUsage(this.getClass().getName(), options);
-        }
-    }
-
-    public static void main(String[] args) {
-        KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer();
-        analyzer.execute(args);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
deleted file mode 100644
index 6a456bc..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-/**
- * only for verify kylin streaming's correctness by comparing to data in original kafka topic
- */
-public class KafkaVerify {
-
-    public static void main(String[] args) throws IOException {
-
-        System.out.println("start");
-
-        ObjectMapper mapper = new ObjectMapper();
-        JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
-        long start = Long.valueOf(args[0]);
-        long end = Long.valueOf(args[1]);
-        long interval = Long.valueOf(args[2]);
-        int bucket = (int) ((end - start + interval - 1) / interval);
-
-        long[] qtySum = new long[bucket];
-        long qtyTotal = 0;
-        long[] counts = new long[bucket];
-        long countTotal = 0;
-        long processed = 0;
-        long minOffset = -1;
-        long maxOffset = -1;
-
-        try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
-            String s;
-            while ((s = br.readLine()) != null) {
-                // process the line.
-                if (++processed % 10000 == 1) {
-                    System.out.println("processing " + processed);
-                }
-
-                Map<String, String> root = mapper.readValue(s, mapType);
-                String tsStr = root.get("sys_ts");
-
-                if (StringUtils.isEmpty(tsStr)) {
-                    continue;
-                }
-                long ts = Long.valueOf(tsStr);
-                if (ts < start || ts >= end) {
-                    continue;
-                }
-
-                if (minOffset == -1) {
-                    minOffset = processed - 1;
-                }
-                maxOffset = processed - 1;
-
-                long qty = Long.valueOf(root.get("qty"));
-                int index = (int) ((ts - start) / interval);
-                qtySum[index] += qty;
-                qtyTotal += qty;
-                counts[index]++;
-                countTotal++;
-            }
-        }
-
-        System.out.println("qty sum is " + Arrays.toString(qtySum));
-        System.out.println("qty total is " + qtyTotal);
-        System.out.println("count is " + Arrays.toString(counts));
-        System.out.println("count total is " + countTotal);
-        System.out.println("first processed is " + minOffset);
-        System.out.println("last processed is " + maxOffset);
-    }
-}


[5/6] kylin git commit: KYLIN-2072 further cleanup old streaming code

Posted by sh...@apache.org.
KYLIN-2072 further cleanup old streaming code


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ed643e6b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ed643e6b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ed643e6b

Branch: refs/heads/master
Commit: ed643e6b20e31a4c3a45d72dc8e5ff1287584764
Parents: c67fa74
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 22:16:38 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800

----------------------------------------------------------------------
 .../kylin/common/util/StreamingBatch.java       | 44 --------------------
 .../kylin/common/util/StreamingMessage.java     |  3 --
 .../org/apache/kylin/engine/EngineFactory.java  | 14 +------
 .../kylin/engine/IStreamingCubingEngine.java    | 26 ------------
 4 files changed, 1 insertion(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
deleted file mode 100644
index e000aa6..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.util.List;
-
-/**
- */
-public final class StreamingBatch {
-
-    private final List<StreamingMessage> messages;
-
-    private final Pair<Long, Long> timeRange;
-
-    public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
-        this.messages = messages;
-        this.timeRange = timeRange;
-    }
-
-    public List<StreamingMessage> getMessages() {
-        return messages;
-    }
-
-    public Pair<Long, Long> getTimeRange() {
-        return timeRange;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index 53ab195..981c8a8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -34,8 +33,6 @@ public class StreamingMessage {
 
     private Map<String, Object> params;
 
-    public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
-
     public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
         this.data = data;
         this.offset = offset;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 7044a3e..acaa7da 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -31,23 +31,15 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 public class EngineFactory {
 
     private static ImplementationSwitch<IBatchCubingEngine> batchEngines;
-    private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
     static {
         Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getJobEngines();
-        batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
-
-        impls.clear();
-        streamingEngines = new ImplementationSwitch<IStreamingCubingEngine>(impls, IStreamingCubingEngine.class); // TODO
+        batchEngines = new ImplementationSwitch<>(impls, IBatchCubingEngine.class);
     }
 
     public static IBatchCubingEngine batchEngine(IEngineAware aware) {
         return batchEngines.get(aware.getEngineType());
     }
 
-    public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
-        return streamingEngines.get(aware.getEngineType());
-    }
-
     /** Mark deprecated to indicate for test purpose only */
     @Deprecated
     public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
@@ -68,8 +60,4 @@ public class EngineFactory {
         return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
     }
 
-    public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
-        return streamingEngine(seg).createStreamingCubingBuilder(seg);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
deleted file mode 100644
index cec57a7..0000000
--- a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-
-public interface IStreamingCubingEngine {
-
-    public Runnable createStreamingCubingBuilder(CubeSegment seg);
-}


[4/6] kylin git commit: KYLIN-2072 Cleanup old streaming code

Posted by sh...@apache.org.
KYLIN-2072 Cleanup old streaming code

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5aee0226
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5aee0226
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5aee0226

Branch: refs/heads/master
Commit: 5aee022612c6fa40c41e8c00063714b79b6d5237
Parents: cb2b12b
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:10:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   4 -
 .../kylin/job/streaming/KafkaDataLoader.java    |  79 ----
 build/bin/cleanup_streaming_files.sh            |  42 --
 build/bin/kylin.sh                              |  61 ---
 build/bin/streaming_build.sh                    |  33 --
 build/bin/streaming_check.sh                    |  29 --
 build/bin/streaming_fillgap.sh                  |  26 --
 build/bin/streaming_rolllog.sh                  |  29 --
 .../metadata/streaming/StreamingConfig.java     |  85 ++++
 .../metadata/streaming/StreamingManager.java    | 248 ++++++++++++
 .../.settings/org.eclipse.core.resources.prefs  |   6 -
 .../.settings/org.eclipse.jdt.core.prefs        | 386 -------------------
 .../.settings/org.eclipse.jdt.ui.prefs          |   7 -
 engine-streaming/pom.xml                        | 121 ------
 .../kylin/engine/streaming/BootstrapConfig.java |  71 ----
 .../kylin/engine/streaming/IStreamingInput.java |  30 --
 .../engine/streaming/IStreamingOutput.java      |  34 --
 .../streaming/OneOffStreamingBuilder.java       |  71 ----
 .../engine/streaming/StreamingBatchBuilder.java |  43 ---
 .../kylin/engine/streaming/StreamingConfig.java |  85 ----
 .../engine/streaming/StreamingManager.java      | 248 ------------
 .../kylin/engine/streaming/cli/MonitorCLI.java  |  88 -----
 .../engine/streaming/cli/StreamingCLI.java      | 114 ------
 .../streaming/cube/StreamingCubeBuilder.java    | 168 --------
 .../diagnose/StreamingLogAnalyzer.java          |  96 -----
 .../streaming/monitor/StreamingMonitor.java     | 172 ---------
 .../engine/streaming/util/StreamingUtils.java   |  51 ---
 .../kylin/provision/BuildCubeWithStream.java    |   4 +-
 pom.xml                                         |   6 -
 .../rest/controller/StreamingController.java    |   2 +-
 .../kylin/rest/controller/TableController.java  |   2 +-
 .../apache/kylin/rest/service/BasicService.java |   2 +-
 .../kylin/rest/service/StreamingService.java    |   2 +-
 source-kafka/pom.xml                            |   6 -
 .../kafka/ByteBufferBackedInputStream.java      |  52 ---
 .../apache/kylin/source/kafka/KafkaSource.java  |   2 +-
 .../kylin/source/kafka/KafkaStreamingInput.java | 227 -----------
 .../source/kafka/TimedJsonStreamParser.java     |   1 +
 .../kafka/diagnose/KafkaInputAnalyzer.java      | 312 ---------------
 .../source/kafka/diagnose/KafkaVerify.java      | 101 -----
 .../source/kafka/diagnose/TimeHistogram.java    |  85 ----
 .../kafka/util/ByteBufferBackedInputStream.java |  52 +++
 .../kylin/source/kafka/util/KafkaRequester.java | 191 ---------
 .../kylin/source/kafka/util/KafkaUtils.java     | 173 ---------
 .../test/java/TimedJsonStreamParserTest.java    |   4 +-
 storage-hbase/pom.xml                           |   4 -
 .../hbase/steps/HBaseStreamingOutput.java       |  98 -----
 .../apache/kylin/tool/CubeMetaExtractor.java    |   4 +-
 48 files changed, 397 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0c80afc..e6f83a8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -47,10 +47,6 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 454f6cf..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader extends StreamDataLoader {
-    List<KafkaClusterConfig> kafkaClusterConfigs;
-
-    public KafkaDataLoader(KafkaConfig kafkaConfig) {
-        super(kafkaConfig);
-        this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
-    }
-
-    public void loadIntoKafka(List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
-        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerList);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-        props.put("retry.backoff.ms", "1000");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, String>(config);
-
-        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
-        for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
-            keyedMessages.add(keyedMessage);
-        }
-        producer.send(keyedMessages);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/build/bin/cleanup_streaming_files.sh b/build/bin/cleanup_streaming_files.sh
deleted file mode 100644
index 9b31a4f..0000000
--- a/build/bin/cleanup_streaming_files.sh
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-if [ $# != 1 ]
-then
-    echo 'invalid input'
-    exit -1
-fi
-
-cd $KYLIN_HOME/logs
-
-for pidfile in `find -L . -name "$1_1*"`
-do
-    pidfile=`echo "$pidfile" | cut -c 3-`
-    echo "pidfile:$pidfile"
-    pid=`cat $pidfile`
-    if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
-    then
-        echo "pid:$pid still running"
-    else
-        echo "pid:$pid not running, try to delete files"
-        echo $pidfile | xargs rm
-        echo "streaming_$pidfile.log" | xargs rm
-    fi
-done
-

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index e767492..039be9f 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -139,67 +139,6 @@ then
         exit 1    
     fi
 
-# streaming command
-elif [ "$1" == "streaming" ]
-then
-    if [ $# -lt 4 ]
-    then
-        echo "invalid input args $@"
-        exit -1
-    fi
-    if [ "$2" == "start" ]
-    then
-        retrieveDependency
-        source ${dir}/find-kafka-dependency.sh
-        
-        # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
-        hbase ${KYLIN_EXTRA_START_OPTS} \
-        -Dlog4j.configuration=kylin-log4j.properties\
-        -Dkylin.hive.dependency=${hive_dependency} \
-        -Dkylin.kafka.dependency=${kafka_dependency} \
-        -Dkylin.hbase.dependency=${hbase_dependency} \
-        org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
-        echo "streaming started name: $3 id: $4"
-        exit 0
-    elif [ "$2" == "stop" ]
-    then
-        if [ ! -f "${KYLIN_HOME}/$3_$4" ]
-            then
-                echo "streaming is not running, please check"
-                exit 1
-            fi
-            pid=`cat ${KYLIN_HOME}/$3_$4`
-            if [ "$pid" = "" ]
-            then
-                echo "streaming is not running, please check"
-                exit 1
-            else
-                echo "stopping streaming:$pid"
-                kill $pid
-            fi
-            rm ${KYLIN_HOME}/$3_$4
-            exit 0
-        else
-            echo
-        fi
-
-# monitor command
-elif [ "$1" == "monitor" ]
-then
-    echo "monitor job"
-
-    retrieveDependency
-    source ${dir}/find-kafka-dependency.sh
-    
-    # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
-    hbase ${KYLIN_EXTRA_START_OPTS} \
-    -Dlog4j.configuration=kylin-log4j.properties\
-    -Dkylin.hive.dependency=${hive_dependency} \
-    -Dkylin.kafka.dependency=${kafka_dependency} \
-    -Dkylin.hbase.dependency=${hbase_dependency} \
-    org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
-    exit 0
-
 elif [ "$1" = "version" ]
 then
     exec hbase -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.common.KylinVersion

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
deleted file mode 100644
index ed19036..0000000
--- a/build/bin/streaming_build.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-CUBE=$1
-INTERVAL=$2
-DELAY=$3
-CURRENT_TIME_IN_SECOND=`date +%s`
-CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
-START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
-END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
-
-ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
deleted file mode 100644
index fef0139..0000000
--- a/build/bin/streaming_check.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
deleted file mode 100644
index c67809a..0000000
--- a/build/bin/streaming_fillgap.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-cube=$1
-
-cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_rolllog.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_rolllog.sh b/build/bin/streaming_rolllog.sh
deleted file mode 100644
index 8018eb8..0000000
--- a/build/bin/streaming_rolllog.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-KYLIN_LOG_HOME=${KYLIN_HOME}/logs
-cd ${KYLIN_LOG_HOME}
-timestamp=`date +%Y_%m_%d_%H_%M_%S`
-tarfile=logs_archived_at_${timestamp}.tar
-files=`find -L . ! -name '*.tar' -type f -mtime +1` # keep two days' log
-echo ${files} | xargs tar -cvf ${tarfile}
-echo ${files} | xargs rm
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
new file mode 100644
index 0000000..9fd6ede
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+    public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+    public static final String STREAMING_TYPE_KAFKA = "kafka";
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("type")
+    private String type = STREAMING_TYPE_KAFKA;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getResourcePath() {
+        return concatResourcePath(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public static String concatResourcePath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    @Override
+    public StreamingConfig clone() {
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            SERIALIZER.serialize(this, new DataOutputStream(baos));
+            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+        } catch (IOException e) {
+            throw new RuntimeException(e);//in mem, should not happen
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
new file mode 100644
index 0000000..8cfe87d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class StreamingManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+
+    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+    private KylinConfig config;
+
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private StreamingManager(KylinConfig config) throws IOException {
+        this.config = config;
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+        
+        // touch lower level metadata before registering my listener
+        reloadAllStreaming();
+        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
+    }
+
+    private class StreamingSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeStreamingLocal(cacheKey);
+            else
+                reloadStreamingConfigLocal(cacheKey);
+        }
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+
+    public static StreamingManager getInstance(KylinConfig config) {
+        StreamingManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (StreamingManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            try {
+                r = new StreamingManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+            }
+        }
+    }
+
+    private static String formatStreamingConfigPath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, int partition) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+    }
+
+    public StreamingConfig getStreamingConfig(String name) {
+        return streamingMap.get(name);
+    }
+
+    public List<StreamingConfig> listAllStreaming() {
+        return new ArrayList<>(streamingMap.values());
+    }
+
+    /**
+     * Reload StreamingConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = StreamingConfig.concatResourcePath(name);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+        // Here replace the old one
+        streamingMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
+    // remove streamingConfig
+    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        String path = streamingConfig.getResourcePath();
+        getStore().deleteResource(path);
+        streamingMap.remove(streamingConfig.getName());
+    }
+
+    public StreamingConfig getConfig(String name) {
+        name = name.toUpperCase();
+        return streamingMap.get(name);
+    }
+
+    public void removeStreamingLocal(String streamingName) {
+        streamingMap.removeLocal(streamingName);
+    }
+
+    /**
+     * Update CubeDesc with the input. Broadcast the event into cluster
+     *
+     * @param desc
+     * @return
+     * @throws IOException
+     */
+    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+        // Validate CubeDesc
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException("SteamingConfig Illegal.");
+        }
+        String name = desc.getName();
+        if (!streamingMap.containsKey(name)) {
+            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
+        }
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+        // Here replace the old one
+        streamingMap.put(ndesc.getName(), desc);
+
+        return ndesc;
+    }
+
+    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+            throw new IllegalArgumentException();
+        }
+
+        if (streamingMap.containsKey(streamingConfig.getName()))
+            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
+        String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
+        getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+        streamingMap.put(streamingConfig.getName(), streamingConfig);
+        return streamingConfig;
+    }
+
+    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+        if (StringUtils.isBlank(streamingDesc.getName())) {
+            throw new IllegalStateException("StreamingConfig name must not be blank");
+        }
+        return streamingDesc;
+    }
+
+    private void reloadAllStreaming() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+        streamingMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            StreamingConfig streamingConfig;
+            try {
+                streamingConfig = loadStreamingConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading streaming desc " + path, e);
+                continue;
+            }
+            if (path.equals(streamingConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+                continue;
+            }
+            if (streamingMap.containsKey(streamingConfig.getName())) {
+                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+        }
+
+        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.core.resources.prefs b/engine-streaming/.settings/org.eclipse.core.resources.prefs
deleted file mode 100644
index 29abf99..0000000
--- a/engine-streaming/.settings/org.eclipse.core.resources.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-eclipse.preferences.version=1
-encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
-encoding//src/test/java=UTF-8
-encoding//src/test/resources=UTF-8
-encoding/<project>=UTF-8

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.core.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.core.prefs b/engine-streaming/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 5aaaf1e..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,386 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled
-org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore
-org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull
-org.eclipse.jdt.core.compiler.annotation.nonnull.secondary=
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
-org.eclipse.jdt.core.compiler.annotation.nullable.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
-org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
-org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.7
-org.eclipse.jdt.core.compiler.debug.lineNumber=generate
-org.eclipse.jdt.core.compiler.debug.localVariable=generate
-org.eclipse.jdt.core.compiler.debug.sourceFile=generate
-org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning
-org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
-org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
-org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning
-org.eclipse.jdt.core.compiler.problem.deadCode=warning
-org.eclipse.jdt.core.compiler.problem.deprecation=warning
-org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
-org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled
-org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore
-org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore
-org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
-org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
-org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
-org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore
-org.eclipse.jdt.core.compiler.problem.finalParameterBound=warning
-org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore
-org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning
-org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
-org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning
-org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=warning
-org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore
-org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore
-org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning
-org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
-org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
-org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled
-org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning
-org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
-org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning
-org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning
-org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore
-org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning
-org.eclipse.jdt.core.compiler.problem.nonnullTypeVariableFromLegacyInvocation=warning
-org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
-org.eclipse.jdt.core.compiler.problem.nullReference=warning
-org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
-org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning
-org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning
-org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.pessimisticNullAnalysisForFreeTypeVariables=warning
-org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
-org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore
-org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
-org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
-org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning
-org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled
-org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
-org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled
-org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore
-org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning
-org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled
-org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore
-org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning
-org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore
-org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore
-org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore
-org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled
-org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedImport=warning
-org.eclipse.jdt.core.compiler.problem.unusedLabel=warning
-org.eclipse.jdt.core.compiler.problem.unusedLocal=warning
-org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled
-org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
-org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.7
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
-org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true
-org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
-org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=999
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_on_off_tags=false
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
-org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs b/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
deleted file mode 100644
index d521bab..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
+++ /dev/null
@@ -1,7 +0,0 @@
-eclipse.preferences.version=1
-formatter_profile=_Space Indent & Long Lines
-formatter_settings_version=12
-org.eclipse.jdt.ui.ignorelowercasenames=true
-org.eclipse.jdt.ui.importorder=java;javax;org;com;
-org.eclipse.jdt.ui.ondemandthreshold=99
-org.eclipse.jdt.ui.staticondemandthreshold=99

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
deleted file mode 100644
index 876279d..0000000
--- a/engine-streaming/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-     http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>kylin-engine-streaming</artifactId>
-    <packaging>jar</packaging>
-    <name>Apache Kylin - Streaming Engine</name>
-
-    <parent>
-        <groupId>org.apache.kylin</groupId>
-        <artifactId>kylin</artifactId>
-        <version>1.6.0-SNAPSHOT</version>
-
-    </parent>
-
-    <properties>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-cube</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-storage</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-job</artifactId>
-        </dependency>
-
-        <!-- Env & Test -->
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-            <!-- protobuf version conflict with hbase -->
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-app</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
deleted file mode 100644
index 35bdfa8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
-    private String cubeName;
-    private long start = 0L;
-    private long end = 0L;
-
-    private boolean fillGap;
-    private long maxFillGapRange = 4 * 3600 * 1000L;
-
-    public long getStart() {
-        return start;
-    }
-
-    public void setStart(long start) {
-        this.start = start;
-    }
-
-    public long getEnd() {
-        return end;
-    }
-
-    public void setEnd(long end) {
-        this.end = end;
-    }
-
-    public String getCubeName() {
-        return cubeName;
-    }
-
-    public void setCubeName(String cubeName) {
-        this.cubeName = cubeName;
-    }
-
-    public boolean isFillGap() {
-        return fillGap;
-    }
-
-    public void setFillGap(boolean fillGap) {
-        this.fillGap = fillGap;
-    }
-
-    public long getMaxFillGapRange() {
-        return maxFillGapRange;
-    }
-
-    public void setMaxFillGapRange(long maxFillGapRange) {
-        this.maxFillGapRange = maxFillGapRange;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
deleted file mode 100644
index c583283..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-/**
- */
-public interface IStreamingInput {
-
-    StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
deleted file mode 100644
index cb15e2b..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-
-/**
- */
-public interface IStreamingOutput {
-
-    ICuboidWriter getCuboidWriter(IBuildable buildable);
-
-    void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
deleted file mode 100644
index c9da46e..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.engine.streaming.util.StreamingUtils;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class OneOffStreamingBuilder {
-
-    private final IStreamingInput streamingInput;
-    private final IStreamingOutput streamingOutput;
-    private final StreamingBatchBuilder streamingBatchBuilder;
-    private final long startTime;
-    private final long endTime;
-    private final RealizationType realizationType;
-    private final String realizationName;
-
-    public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
-        Preconditions.checkArgument(startTime < endTime);
-        this.startTime = startTime;
-        this.endTime = endTime;
-        this.realizationType = Preconditions.checkNotNull(realizationType);
-        this.realizationName = Preconditions.checkNotNull(realizationName);
-        this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
-        this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
-        this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
-    }
-
-    public Runnable build() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
-                final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
-                final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
-                final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);
-                streamingBatchBuilder.build(streamingBatch, dictionaryMap, streamingOutput.getCuboidWriter(buildable));
-                streamingOutput.output(buildable, samplingResult);
-                streamingBatchBuilder.commit(buildable);
-            }
-        };
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
deleted file mode 100644
index 8b0b8e6..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public interface StreamingBatchBuilder {
-
-    IBuildable createBuildable(StreamingBatch streamingBatch);
-
-    Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch);
-
-    Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable);
-
-    void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter);
-
-    void commit(IBuildable buildable);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
deleted file mode 100644
index 9d1a0b1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class StreamingConfig extends RootPersistentEntity {
-
-    public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
-    public static final String STREAMING_TYPE_KAFKA = "kafka";
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("type")
-    private String type = STREAMING_TYPE_KAFKA;
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getResourcePath() {
-        return concatResourcePath(name);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public static String concatResourcePath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    @Override
-    public StreamingConfig clone() {
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            SERIALIZER.serialize(this, new DataOutputStream(baos));
-            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
-        } catch (IOException e) {
-            throw new RuntimeException(e);//in mem, should not happen
-        }
-    }
-
-}


[6/6] kylin git commit: KYLIN-1726 package rename

Posted by sh...@apache.org.
KYLIN-1726 package rename


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67fa740
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67fa740
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67fa740

Branch: refs/heads/master
Commit: c67fa740d364d372c7a6424fd160570cc7e890c4
Parents: 5aee022
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:24:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/kafka/KafkaMRInput.java |   3 +
 .../kylin/source/kafka/MergeOffsetStep.java     |  80 -----------
 .../kylin/source/kafka/SeekOffsetStep.java      | 140 ------------------
 .../source/kafka/StringStreamingParser.java     |  51 -------
 .../apache/kylin/source/kafka/TopicMeta.java    |  46 ------
 .../kylin/source/kafka/UpdateTimeRangeStep.java | 117 ---------------
 .../kylin/source/kafka/job/MergeOffsetStep.java |  80 +++++++++++
 .../kylin/source/kafka/job/SeekOffsetStep.java  | 141 +++++++++++++++++++
 .../source/kafka/job/UpdateTimeRangeStep.java   | 117 +++++++++++++++
 .../kafka/util/ByteBufferBackedInputStream.java |   6 +-
 10 files changed, 344 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 6358ee1..4d1f5c9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+import org.apache.kylin.source.kafka.job.SeekOffsetStep;
+import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
 
 import javax.annotation.Nullable;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
deleted file mode 100644
index 18c959a..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class MergeOffsetStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
-    public MergeOffsetStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
-        List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
-
-        Collections.sort(mergingSegs);
-
-        final CubeSegment first = mergingSegs.get(0);
-        final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
-
-        segment.setSourceOffsetStart(first.getSourceOffsetStart());
-        segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
-        segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
-        segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
-
-        long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
-        long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
-
-        segment.setDateRangeStart(dateRangeStart);
-        segment.setDateRangeEnd(dateRangeEnd);
-
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToUpdateSegs(segment);
-        try {
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
deleted file mode 100644
index 151b912..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SeekOffsetStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
-
-    public SeekOffsetStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
-        Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
-        Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
-
-        if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
-        }
-
-        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
-            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
-            if (startOffsets.isEmpty()) {
-                // user didn't specify start offset, use the biggest offset in existing segments as start
-                for (CubeSegment seg : cube.getSegments()) {
-                    Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
-                    for (PartitionInfo partition : partitionInfos) {
-                        int partitionId = partition.partition();
-                        if (segEndOffset.containsKey(partitionId)) {
-                            startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
-                        }
-                    }
-                }
-
-                if (partitionInfos.size() > startOffsets.size()) {
-                    // has new partition added
-                    for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
-                        long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
-                        startOffsets.put(partitionInfos.get(x).partition(), earliest);
-                    }
-                }
-
-                logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
-            }
-
-            if (endOffsets.isEmpty()) {
-                // user didn't specify end offset, use latest offset in kafka
-                for (PartitionInfo partitionInfo : partitionInfos) {
-                    long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
-                    endOffsets.put(partitionInfo.partition(), latest);
-                }
-
-                logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
-            }
-        }
-
-        long totalStartOffset = 0, totalEndOffset = 0;
-        for (Long v : startOffsets.values()) {
-            totalStartOffset += v;
-        }
-        for (Long v : endOffsets.values()) {
-            totalEndOffset += v;
-        }
-
-        if (totalEndOffset > totalStartOffset) {
-            segment.setSourceOffsetStart(totalStartOffset);
-            segment.setSourceOffsetEnd(totalEndOffset);
-            segment.setSourcePartitionOffsetStart(startOffsets);
-            segment.setSourcePartitionOffsetEnd(endOffsets);
-            segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToUpdateSegs(segment);
-            try {
-                cubeManager.updateCube(cubeBuilder);
-            } catch (IOException e) {
-                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
-        } else {
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToRemoveSegs(segment);
-            try {
-                cubeManager.updateCube(cubeBuilder);
-            } catch (IOException e) {
-                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-            }
-
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
-        }
-
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
deleted file mode 100644
index f74df83..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public final class StringStreamingParser extends StreamingParser {
-
-    public static final StringStreamingParser instance = new StringStreamingParser(null, null);
-
-    private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
-    }
-
-    @Override
-    public StreamingMessage parse(ByteBuffer message) {
-        byte[] bytes = new byte[message.limit()];
-        message.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
-    }
-
-    @Override
-    public boolean filter(StreamingMessage streamingMessage) {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
deleted file mode 100644
index a73543e..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- */
-public class TopicMeta {
-
-    private final String name;
-
-    private final List<Integer> partitionIds;
-
-    public TopicMeta(String name, List<Integer> partitionIds) {
-        this.name = name;
-        this.partitionIds = Collections.unmodifiableList(partitionIds);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public List<Integer> getPartitionIds() {
-        return partitionIds;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
deleted file mode 100644
index 9e902d8..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class UpdateTimeRangeStep extends AbstractExecutable {
-
-    private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
-
-    public UpdateTimeRangeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
-        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-        final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, partitionCol.getName());
-
-        String minValue = null, maxValue = null, currentValue = null;
-        FSDataInputStream inputStream = null;
-        BufferedReader bufferedReader = null;
-        try {
-            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
-            inputStream = fs.open(outputFile);
-            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
-            minValue = currentValue = bufferedReader.readLine();
-            while (currentValue != null) {
-                maxValue = currentValue;
-                currentValue = bufferedReader.readLine();
-            }
-        } catch (IOException e) {
-            logger.error("fail to read file " + outputFile, e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } finally {
-            IOUtils.closeQuietly(bufferedReader);
-            IOUtils.closeQuietly(inputStream);
-        }
-
-        final DataType partitionColType = partitionCol.getType();
-        FastDateFormat dateFormat;
-        if (partitionColType.isDate()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
-        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat)) {
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            }
-            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
-        } else {
-            return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
-        }
-
-        try {
-            long startTime = dateFormat.parse(minValue).getTime();
-            long endTime = dateFormat.parse(maxValue).getTime();
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            segment.setDateRangeStart(startTime);
-            segment.setDateRangeEnd(endTime);
-            cubeBuilder.setToUpdateSegs(segment);
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (Exception e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
new file mode 100644
index 0000000..9cadd72
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class MergeOffsetStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
+    public MergeOffsetStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+
+        Collections.sort(mergingSegs);
+
+        final CubeSegment first = mergingSegs.get(0);
+        final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
+
+        segment.setSourceOffsetStart(first.getSourceOffsetStart());
+        segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+        segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+        segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+
+        long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
+        long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
+
+        segment.setDateRangeStart(dateRangeStart);
+        segment.setDateRangeEnd(dateRangeEnd);
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToUpdateSegs(segment);
+        try {
+            cubeManager.updateCube(cubeBuilder);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to update cube segment offset", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
new file mode 100644
index 0000000..5751095
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SeekOffsetStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
+
+    public SeekOffsetStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
+        Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
+
+        if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
+        }
+
+        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
+        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+        final String topic = kafakaConfig.getTopic();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+
+            if (startOffsets.isEmpty()) {
+                // user didn't specify start offset, use the biggest offset in existing segments as start
+                for (CubeSegment seg : cube.getSegments()) {
+                    Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
+                    for (PartitionInfo partition : partitionInfos) {
+                        int partitionId = partition.partition();
+                        if (segEndOffset.containsKey(partitionId)) {
+                            startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
+                        }
+                    }
+                }
+
+                if (partitionInfos.size() > startOffsets.size()) {
+                    // has new partition added
+                    for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+                        long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+                        startOffsets.put(partitionInfos.get(x).partition(), earliest);
+                    }
+                }
+
+                logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+            }
+
+            if (endOffsets.isEmpty()) {
+                // user didn't specify end offset, use latest offset in kafka
+                for (PartitionInfo partitionInfo : partitionInfos) {
+                    long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+                    endOffsets.put(partitionInfo.partition(), latest);
+                }
+
+                logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
+            }
+        }
+
+        long totalStartOffset = 0, totalEndOffset = 0;
+        for (Long v : startOffsets.values()) {
+            totalStartOffset += v;
+        }
+        for (Long v : endOffsets.values()) {
+            totalEndOffset += v;
+        }
+
+        if (totalEndOffset > totalStartOffset) {
+            segment.setSourceOffsetStart(totalStartOffset);
+            segment.setSourceOffsetEnd(totalEndOffset);
+            segment.setSourcePartitionOffsetStart(startOffsets);
+            segment.setSourcePartitionOffsetEnd(endOffsets);
+            segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToUpdateSegs(segment);
+            try {
+                cubeManager.updateCube(cubeBuilder);
+            } catch (IOException e) {
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            }
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
+        } else {
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToRemoveSegs(segment);
+            try {
+                cubeManager.updateCube(cubeBuilder);
+            } catch (IOException e) {
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+            }
+
+            return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
new file mode 100644
index 0000000..d19aa63
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateTimeRangeStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
+
+    public UpdateTimeRangeStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+        final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+        final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+        String minValue = null, maxValue = null, currentValue = null;
+        FSDataInputStream inputStream = null;
+        BufferedReader bufferedReader = null;
+        try {
+            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+            inputStream = fs.open(outputFile);
+            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+            minValue = currentValue = bufferedReader.readLine();
+            while (currentValue != null) {
+                maxValue = currentValue;
+                currentValue = bufferedReader.readLine();
+            }
+        } catch (IOException e) {
+            logger.error("fail to read file " + outputFile, e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        } finally {
+            IOUtils.closeQuietly(bufferedReader);
+            IOUtils.closeQuietly(inputStream);
+        }
+
+        final DataType partitionColType = partitionCol.getType();
+        FastDateFormat dateFormat;
+        if (partitionColType.isDate()) {
+            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+        } else if (partitionColType.isStringFamily()) {
+            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat)) {
+                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+            }
+            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+        } else {
+            return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
+        }
+
+        try {
+            long startTime = dateFormat.parse(minValue).getTime();
+            long endTime = dateFormat.parse(maxValue).getTime();
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            segment.setDateRangeStart(startTime);
+            segment.setDateRangeEnd(endTime);
+            cubeBuilder.setToUpdateSegs(segment);
+            cubeManager.updateCube(cubeBuilder);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (Exception e) {
+            logger.error("fail to update cube segment offset", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
index 7a42598..894a144 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.source.kafka.util;
 
 import java.io.IOException;