You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/09 14:17:38 UTC

[6/9] kylin git commit: KYLIN-2072 Cleanup old streaming code

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
deleted file mode 100644
index 271bf41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
-import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class StreamingManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
-
-    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
-    private KylinConfig config;
-
-    // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
-
-    public static void clearCache() {
-        CACHE.clear();
-    }
-
-    private StreamingManager(KylinConfig config) throws IOException {
-        this.config = config;
-        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
-        
-        // touch lower level metadata before registering my listener
-        reloadAllStreaming();
-        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
-    }
-
-    private class StreamingSyncListener extends Broadcaster.Listener {
-        @Override
-        public void onClearAll(Broadcaster broadcaster) throws IOException {
-            clearCache();
-        }
-
-        @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
-            if (event == Event.DROP)
-                removeStreamingLocal(cacheKey);
-            else
-                reloadStreamingConfigLocal(cacheKey);
-        }
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(this.config);
-    }
-
-    public static StreamingManager getInstance(KylinConfig config) {
-        StreamingManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (StreamingManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            try {
-                r = new StreamingManager(config);
-                CACHE.put(config, r);
-                if (CACHE.size() > 1) {
-                    logger.warn("More than one singleton exist");
-                }
-                return r;
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
-            }
-        }
-    }
-
-    private static String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private static String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
-    }
-
-    private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
-    }
-
-    public StreamingConfig getStreamingConfig(String name) {
-        return streamingMap.get(name);
-    }
-
-    public List<StreamingConfig> listAllStreaming() {
-        return new ArrayList<>(streamingMap.values());
-    }
-
-    /**
-     * Reload StreamingConfig from resource store It will be triggered by an desc
-     * update event.
-     *
-     * @param name
-     * @throws IOException
-     */
-    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
-
-        // Save Source
-        String path = StreamingConfig.concatResourcePath(name);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-
-        // Here replace the old one
-        streamingMap.putLocal(ndesc.getName(), ndesc);
-        return ndesc;
-    }
-
-    // remove streamingConfig
-    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
-        String path = streamingConfig.getResourcePath();
-        getStore().deleteResource(path);
-        streamingMap.remove(streamingConfig.getName());
-    }
-
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
-    public void removeStreamingLocal(String streamingName) {
-        streamingMap.removeLocal(streamingName);
-    }
-
-    /**
-     * Update CubeDesc with the input. Broadcast the event into cluster
-     *
-     * @param desc
-     * @return
-     * @throws IOException
-     */
-    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
-        // Validate CubeDesc
-        if (desc.getUuid() == null || desc.getName() == null) {
-            throw new IllegalArgumentException("SteamingConfig Illegal.");
-        }
-        String name = desc.getName();
-        if (!streamingMap.containsKey(name)) {
-            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
-        }
-
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, STREAMING_SERIALIZER);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-        // Here replace the old one
-        streamingMap.put(ndesc.getName(), desc);
-
-        return ndesc;
-    }
-
-    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
-        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
-            throw new IllegalArgumentException();
-        }
-
-        if (streamingMap.containsKey(streamingConfig.getName()))
-            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
-
-        String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
-        getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
-        streamingMap.put(streamingConfig.getName(), streamingConfig);
-        return streamingConfig;
-    }
-
-    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
-        ResourceStore store = getStore();
-        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
-
-        if (StringUtils.isBlank(streamingDesc.getName())) {
-            throw new IllegalStateException("StreamingConfig name must not be blank");
-        }
-        return streamingDesc;
-    }
-
-    private void reloadAllStreaming() throws IOException {
-        ResourceStore store = getStore();
-        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
-
-        streamingMap.clear();
-
-        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
-        for (String path : paths) {
-            StreamingConfig streamingConfig;
-            try {
-                streamingConfig = loadStreamingConfigAt(path);
-            } catch (Exception e) {
-                logger.error("Error loading streaming desc " + path, e);
-                continue;
-            }
-            if (path.equals(streamingConfig.getResourcePath()) == false) {
-                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
-                continue;
-            }
-            if (streamingMap.containsKey(streamingConfig.getName())) {
-                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
-                continue;
-            }
-
-            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
-        }
-
-        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
deleted file mode 100644
index 32030ad..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName, host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
deleted file mode 100644
index 1d66b41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            Preconditions.checkArgument(args[0].equals("streaming"));
-            Preconditions.checkArgument(args[1].equals("start"));
-
-            int i = 2;
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            while (i < args.length) {
-                String argName = args[i];
-                switch (argName) {
-                case "-start":
-                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                    break;
-                case "-end":
-                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                    break;
-                case "-cube":
-                    bootstrapConfig.setCubeName(args[++i]);
-                    break;
-                case "-fillGap":
-                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
-                    break;
-                case "-maxFillGapRange":
-                    bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
-                    break;
-                default:
-                    logger.warn("ignore this arg:" + argName);
-                }
-                i++;
-            }
-            if (bootstrapConfig.isFillGap()) {
-                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
-                logger.info("all gaps:" + StringUtils.join(gaps, ","));
-                for (Pair<Long, Long> gap : gaps) {
-                    List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
-                    for (Pair<Long, Long> splitGap : splitGaps) {
-                        logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
-                        startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond());
-                        logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
-                    }
-                }
-            } else {
-                startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
-                logger.info("streaming process finished, exit with 0");
-                System.exit(0);
-            }
-        } catch (Exception e) {
-            printArgsError(args);
-            logger.error("error start streaming", e);
-            System.exit(-1);
-        }
-    }
-
-    private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        Long startTime = gap.getFirst();
-
-        while (startTime < gap.getSecond()) {
-            Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
-            gaps.add(Pair.newPair(startTime, endTime));
-            startTime = endTime;
-        }
-
-        return gaps;
-    }
-
-    private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
-        final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
-        runnable.run();
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
deleted file mode 100644
index 350a5f8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cube;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingCubeBuilder implements StreamingBatchBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class);
-
-    private final String cubeName;
-    private int processedRowCount = 0;
-
-    public StreamingCubeBuilder(String cubeName) {
-        this.cubeName = cubeName;
-    }
-
-    @Override
-    public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter) {
-        try {
-            CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-            final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-            
-            LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
-            InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
-            final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
-            processedRowCount = streamingBatch.getMessages().size();
-            for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
-                blockingQueue.put(streamingMessage.getData());
-            }
-            blockingQueue.put(Collections.<String> emptyList());
-            future.get();
-            cuboidWriter.flush();
-
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-        } catch (IOException e) {
-            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-        } finally {
-            try {
-                cuboidWriter.close();
-            } catch (IOException e) {
-                throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
-            }
-        }
-    }
-
-    @Override
-    public IBuildable createBuildable(StreamingBatch streamingBatch) {
-        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        try {
-            CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
-            segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
-            segment.setInputRecords(streamingBatch.getMessages().size());
-            segment.setLastBuildTime(System.currentTimeMillis());
-            return segment;
-        } catch (IOException e) {
-            throw new RuntimeException("failed to create IBuildable", e);
-        }
-    }
-
-    @Override
-    public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
-        final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-        long start = System.currentTimeMillis();
-
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable StreamingMessage input) {
-                return input.getData();
-            }
-        }));
-        logger.info(String.format("sampling of %d messages cost %d ms", streamingBatch.getMessages().size(), (System.currentTimeMillis() - start)));
-        return samplingResult;
-    }
-
-    @Override
-    public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable) {
-        final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final Map<TblColRef, Dictionary<String>> dictionaryMap;
-        try {
-            dictionaryMap = CubingUtils.buildDictionary(cubeInstance, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-                @Nullable
-                @Override
-                public List<String> apply(@Nullable StreamingMessage input) {
-                    return input.getData();
-                }
-            }));
-            Map<TblColRef, Dictionary<String>> realDictMap = CubingUtils.writeDictionary((CubeSegment) buildable, dictionaryMap, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
-            return realDictMap;
-        } catch (IOException e) {
-            throw new RuntimeException("failed to build dictionary", e);
-        }
-    }
-
-    @Override
-    public void commit(IBuildable buildable) {
-        CubeSegment cubeSegment = (CubeSegment) buildable;
-        cubeSegment.setStatus(SegmentStatusEnum.READY);
-        cubeSegment.setInputRecords(processedRowCount);
-        CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
-        cubeBuilder.setToUpdateSegs(cubeSegment);
-        try {
-            CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder);
-        } catch (IOException e) {
-            throw new RuntimeException("failed to update CubeSegment", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
deleted file mode 100644
index fba664d..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.diagnose;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingLogAnalyzer {
-    public static void main(String[] args) {
-        int errorFileCount = 0;
-        List<Long> ellapsedTimes = Lists.newArrayList();
-
-        String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})";
-        Pattern pattern = Pattern.compile(patternStr);
-
-        SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
-
-        Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder");
-        for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) {
-            System.out.println("Processing file " + file.toString());
-
-            long startTime = 0;
-            long endTime = 0;
-            try {
-                List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset());
-                for (int i = 0; i < contents.size(); ++i) {
-                    Matcher m = pattern.matcher(contents.get(i));
-                    if (m.find()) {
-                        startTime = format.parse("20" + m.group(1)).getTime();
-                        break;
-                    }
-                }
-
-                for (int i = contents.size() - 1; i >= 0; --i) {
-                    Matcher m = pattern.matcher(contents.get(i));
-                    if (m.find()) {
-                        endTime = format.parse("20" + m.group(1)).getTime();
-                        break;
-                    }
-                }
-
-                if (startTime == 0 || endTime == 0) {
-                    throw new RuntimeException("start time or end time is not found");
-                }
-
-                if (endTime - startTime < 60000) {
-                    System.out.println("Warning: this job took less than one minute!!!! " + file.toString());
-                }
-
-                ellapsedTimes.add(endTime - startTime);
-
-            } catch (Exception e) {
-                System.out.println("Exception when processing log file " + file.toString());
-                System.out.println(e);
-                errorFileCount++;
-            }
-        }
-
-        System.out.println("Totally error files count " + errorFileCount);
-        System.out.println("Totally normal files processed " + ellapsedTimes.size());
-
-        long sum = 0;
-        for (Long x : ellapsedTimes) {
-            sum += x;
-        }
-        System.out.println("Avg build time " + (sum / ellapsedTimes.size()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
deleted file mode 100644
index 55252c4..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
-            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap<?, ?> hashMap = JsonUtil.readValue(msg, HashMap.class);
-                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String host) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String content) {
-        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
deleted file mode 100644
index 5790bc1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.util;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TODO: like MRUtil, use Factory pattern to allow config
- */
-public class StreamingUtils {
-
-    public static IStreamingInput getStreamingInput() {
-        return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
-    }
-
-    public static IStreamingOutput getStreamingOutput() {
-        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
-    }
-
-    public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
-        Preconditions.checkNotNull(realizationName);
-        if (realizationType == RealizationType.CUBE) {
-            return new StreamingCubeBuilder(realizationName);
-        } else {
-            throw new UnsupportedOperationException("not implemented yet");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c30abc0..a47fcde 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -44,8 +44,8 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09ef0e8..72e4069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,11 +225,6 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
-                <artifactId>kylin-engine-streaming</artifactId>
-                <version>${project.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-engine-spark</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -1017,7 +1012,6 @@
         <module>core-job</module>
         <module>core-storage</module>
         <module>engine-mr</module>
-        <module>engine-streaming</module>
         <module>engine-spark</module>
         <module>source-hive</module>
         <module>source-kafka</module>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..a5fb874 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.rest.exception.BadRequestException;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index c4af5f4..34cc57f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..170c395 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e49e882..7310d9c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.springframework.beans.factory.annotation.Autowired;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index d4cdfd5..e2100c4 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -43,12 +43,6 @@
             <artifactId>kylin-core-common</artifactId>
         </dependency>
 
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
deleted file mode 100644
index 6981096..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- */
-class ByteBufferBackedInputStream extends InputStream {
-
-    private ByteBuffer buf;
-
-    public ByteBufferBackedInputStream(ByteBuffer buf) {
-        this.buf = buf;
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (!buf.hasRemaining()) {
-            return -1;
-        }
-        return buf.get() & 0xFF;
-    }
-
-    @Override
-    public int read(byte[] bytes, int off, int len) throws IOException {
-        if (!buf.hasRemaining()) {
-            return -1;
-        }
-
-        len = Math.min(len, buf.remaining());
-        buf.get(bytes, off, len);
-        return len;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index d039583..208c0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -20,7 +20,7 @@ package org.apache.kylin.source.kafka;
 
 import com.google.common.collect.Lists;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
deleted file mode 100644
index 78a67c2..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-import javax.annotation.Nullable;
-
-@SuppressWarnings("unused")
-public class KafkaStreamingInput implements IStreamingInput {
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
-
-    @Override
-    public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
-        if (realizationType != RealizationType.CUBE) {
-            throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
-        }
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
-        final String streaming = cube.getFactTable();
-        final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
-        final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
-        if (streamingConfig == null) {
-            throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
-        }
-        if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
-            logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
-
-            try {
-                final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
-                final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-                List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
-
-                final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
-                final ExecutorService executorService = Executors.newCachedThreadPool();
-                final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
-                for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-
-                    final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-                    for (int i = 0; i < partitionCount; ++i) {
-                        final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
-                        final Future<List<StreamingMessage>> future = executorService.submit(producer);
-                        futures.add(future);
-                    }
-                }
-                List<StreamingMessage> messages = Lists.newLinkedList();
-                for (Future<List<StreamingMessage>> future : futures) {
-                    try {
-                        messages.addAll(future.get());
-                    } catch (InterruptedException e) {
-                        logger.warn("this thread should not be interrupted, just ignore", e);
-                        continue;
-                    } catch (ExecutionException e) {
-                        throw new RuntimeException("error when get StreamingMessages", e.getCause());
-                    }
-                }
-                final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
-                logger.info("finish to get streaming batch, total message count:" + messages.size());
-                return new StreamingBatch(messages, timeRange);
-            } catch (ReflectiveOperationException e) {
-                throw new RuntimeException("failed to create instance of StreamingParser", e);
-            }
-        } else {
-            throw new IllegalArgumentException("kafka is the only supported streaming type.");
-        }
-    }
-
-    private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> {
-
-        private final KafkaClusterConfig kafkaClusterConfig;
-        private final int partitionId;
-        private final StreamingParser streamingParser;
-        private final Pair<Long, Long> timeRange;
-        private final long margin;
-
-        private List<Broker> replicaBrokers;
-
-        StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int partitionId, Pair<Long, Long> timeRange, long margin, StreamingParser streamingParser) {
-            this.kafkaClusterConfig = kafkaClusterConfig;
-            this.partitionId = partitionId;
-            this.streamingParser = streamingParser;
-            this.margin = margin;
-            this.timeRange = timeRange;
-            this.replicaBrokers = kafkaClusterConfig.getBrokers();
-        }
-
-        private Broker getLeadBroker() {
-            final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
-            if (partitionMetadata != null) {
-                if (partitionMetadata.errorCode() != 0) {
-                    logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
-                }
-                replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
-                    @Nullable
-                    @Override
-                    public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
-                        return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
-                    }
-                });
-                BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
-                return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
-            } else {
-                return null;
-            }
-        }
-
-        @Override
-        public List<StreamingMessage> call() throws Exception {
-            List<StreamingMessage> result = Lists.newLinkedList();
-            try {
-                long startTimestamp = timeRange.getFirst() - margin;
-                long offset = KafkaUtils.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, partitionId, startTimestamp, streamingParser);
-                int fetchRound = 0;
-                int consumeMsgCount = 0;
-                Broker leadBroker = null;
-                String topic = kafkaClusterConfig.getTopic();
-                while (true) {
-                    boolean outOfMargin = false;
-                    int consumeMsgCountAtBeginning = consumeMsgCount;
-                    fetchRound++;
-
-                    if (leadBroker == null) {
-                        leadBroker = getLeadBroker();
-                    }
-
-                    if (leadBroker == null) {
-                        logger.warn("cannot find lead broker, wait 5s");
-                        Thread.sleep(5000);
-                        continue;
-                    }
-
-                    logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
-                    final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
-                    if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                        logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                        Thread.sleep(30000);
-                        continue;
-                    }
-
-                    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
-                        offset++;
-                        consumeMsgCount++;
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-                        streamingMessage.setOffset(messageAndOffset.offset());
-                        if (streamingParser.filter(streamingMessage)) {
-                            final long timestamp = streamingMessage.getTimestamp();
-                            if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {
-                                result.add(streamingMessage);
-                            } else if (timestamp < timeRange.getSecond() + margin) {
-                                //do nothing
-                            } else {
-                                logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + timeRange + " margin:" + margin);
-                                outOfMargin = true;
-                                break;
-                            }
-                        }
-                    }
-                    logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-                    if (outOfMargin) {
-                        break;
-                    }
-                    if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
-                        logger.info("no message consumed this round, wait 30s");
-                        Thread.sleep(30000);
-                    }
-                }
-            } catch (InterruptedException e) {
-                logger.warn("this thread should not be interrupted, just stop fetching", e);
-            }
-            return result;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e4c702d..633a30c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.util.ByteBufferBackedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
deleted file mode 100644
index b1b4011..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.message.MessageAndOffset;
-
-/**
- * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
- * This daemon only store a digest so it should not be space-consuming
- */
-public class KafkaInputAnalyzer extends AbstractApplication {
-
-    public class KafkaMessagePuller implements Runnable {
-
-        private final String topic;
-        private final int partitionId;
-        private final KafkaClusterConfig streamingConfig;
-        private final LinkedBlockingQueue<StreamingMessage> streamQueue;
-        private final StreamingParser streamingParser;
-        private final Broker leadBroker;
-        private long offset;
-
-        protected final Logger logger;
-
-        public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.streamingConfig = kafkaClusterConfig;
-            this.offset = startOffset;
-            this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId);
-            this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000);
-            this.streamingParser = streamingParser;
-            this.leadBroker = leadBroker;
-        }
-
-        public BlockingQueue<StreamingMessage> getStreamQueue() {
-            return streamQueue;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int consumeMsgCount = 0;
-                int fetchRound = 0;
-                while (true) {
-                    int consumeMsgCountAtBeginning = consumeMsgCount;
-                    fetchRound++;
-
-                    logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
-                    final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
-                    if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                        logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
-                        Thread.sleep(30000);
-                        continue;
-                    }
-
-                    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
-                        offset++;
-                        consumeMsgCount++;
-
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
-                        streamingMessage.setOffset(messageAndOffset.offset());
-                        if (streamingParser.filter(streamingMessage)) {
-                            streamQueue.add(streamingMessage);
-                        }
-
-                    }
-                    logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-
-                    if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round 
-                        Thread.sleep(30000);
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("consumer has encountered an error", e);
-            }
-        }
-
-    }
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
-
-    private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
-
-    private StreamingParser parser;
-    private KafkaConfig kafkaConfig;
-
-    private Options options;
-
-    public KafkaInputAnalyzer() {
-        options = new Options();
-        options.addOption(OPTION_STREAMING);
-        options.addOption(OPTION_TASK);
-        options.addOption(OPTION_TSCOLNAME);
-
-    }
-
-    private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
-        List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList();
-        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
-            final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId);
-            long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
-            logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
-            KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser);
-            Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
-            result.add(consumer.getStreamQueue());
-        }
-        return result;
-    }
-
-    private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) {
-        int clusterId = 0;
-        final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList();
-
-        for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-            final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
-            queues.addAll(oneClusterQueue);
-            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
-            clusterId++;
-        }
-        return queues;
-    }
-
-    private void analyzeLatency() throws InterruptedException {
-        long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
-        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
-        final List<TimeHistogram> allHistograms = Lists.newArrayList();
-        final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
-
-        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
-        for (int i = 0; i < allPartitionData.size(); ++i) {
-            final int index = i;
-            allHistograms.add(new TimeHistogram(intervals, "" + i));
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    while (true) {
-                        try {
-                            StreamingMessage message = allPartitionData.get(index).take();
-                            long t = message.getTimestamp();
-                            allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
-                            overallHistogram.processMillis(System.currentTimeMillis() - t);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            });
-        }
-
-        while (true) {
-            System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
-
-            for (TimeHistogram histogram : allHistograms) {
-                histogram.printStatus();
-            }
-            overallHistogram.printStatus();
-            Thread.sleep(300000);
-        }
-    }
-
-    private void analyzeDisorder() throws InterruptedException {
-        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
-
-        final List<Long> wallClocks = Lists.newArrayList();
-        final List<Long> wallOffset = Lists.newArrayList();
-        final List<Long> maxDisorderTime = Lists.newArrayList();
-        final List<Long> maxDisorderOffset = Lists.newArrayList();
-        final List<Long> processedMessages = Lists.newArrayList();
-
-        for (int i = 0; i < allPartitionData.size(); i++) {
-            wallClocks.add(0L);
-            wallOffset.add(0L);
-            maxDisorderTime.add(0L);
-            maxDisorderOffset.add(0L);
-            processedMessages.add(0L);
-        }
-
-        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
-        final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
-        for (int i = 0; i < allPartitionData.size(); ++i) {
-            final int index = i;
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        while (true) {
-                            StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
-                            if (message == null) {
-                                System.out.println(String.format("Thread %d is exiting", index));
-                                break;
-                            }
-                            long t = message.getTimestamp();
-                            long offset = message.getOffset();
-                            if (t < wallClocks.get(index)) {
-                                maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
-                                maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
-                            } else {
-                                wallClocks.set(index, t);
-                                wallOffset.set(index, offset);
-                            }
-                            processedMessages.set(index, processedMessages.get(index) + 1);
-
-                            if (processedMessages.get(index) % 10000 == 1) {
-                                System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", //
-                                        index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
-                            }
-                        }
-
-                        System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", //
-                                index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
-                        countDownLatch.countDown();
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-
-        countDownLatch.await();
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-
-        String streaming = optionsHelper.getOptionValue(OPTION_STREAMING);
-        String task = optionsHelper.getOptionValue(OPTION_TASK);
-        String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
-
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
-        kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
-        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
-
-        if ("disorder".equalsIgnoreCase(task)) {
-            analyzeDisorder();
-        } else if ("delay".equalsIgnoreCase(task)) {
-            analyzeLatency();
-        } else {
-            optionsHelper.printUsage(this.getClass().getName(), options);
-        }
-    }
-
-    public static void main(String[] args) {
-        KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer();
-        analyzer.execute(args);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
deleted file mode 100644
index 6a456bc..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-/**
- * only for verify kylin streaming's correctness by comparing to data in original kafka topic
- */
-public class KafkaVerify {
-
-    public static void main(String[] args) throws IOException {
-
-        System.out.println("start");
-
-        ObjectMapper mapper = new ObjectMapper();
-        JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
-        long start = Long.valueOf(args[0]);
-        long end = Long.valueOf(args[1]);
-        long interval = Long.valueOf(args[2]);
-        int bucket = (int) ((end - start + interval - 1) / interval);
-
-        long[] qtySum = new long[bucket];
-        long qtyTotal = 0;
-        long[] counts = new long[bucket];
-        long countTotal = 0;
-        long processed = 0;
-        long minOffset = -1;
-        long maxOffset = -1;
-
-        try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
-            String s;
-            while ((s = br.readLine()) != null) {
-                // process the line.
-                if (++processed % 10000 == 1) {
-                    System.out.println("processing " + processed);
-                }
-
-                Map<String, String> root = mapper.readValue(s, mapType);
-                String tsStr = root.get("sys_ts");
-
-                if (StringUtils.isEmpty(tsStr)) {
-                    continue;
-                }
-                long ts = Long.valueOf(tsStr);
-                if (ts < start || ts >= end) {
-                    continue;
-                }
-
-                if (minOffset == -1) {
-                    minOffset = processed - 1;
-                }
-                maxOffset = processed - 1;
-
-                long qty = Long.valueOf(root.get("qty"));
-                int index = (int) ((ts - start) / interval);
-                qtySum[index] += qty;
-                qtyTotal += qty;
-                counts[index]++;
-                countTotal++;
-            }
-        }
-
-        System.out.println("qty sum is " + Arrays.toString(qtySum));
-        System.out.println("qty total is " + qtyTotal);
-        System.out.println("count is " + Arrays.toString(counts));
-        System.out.println("count total is " + countTotal);
-        System.out.println("first processed is " + minOffset);
-        System.out.println("last processed is " + maxOffset);
-    }
-}