You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/16 05:05:16 UTC

[2/5] incubator-kylin git commit: KYLIN-1010 Job module with only II and tests left

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
new file mode 100644
index 0000000..826ce0a
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+
+import com.google.common.collect.Lists;
+
+public class GridTableHBaseBenchmark {
+
+    private static final String TEST_TABLE = "GridTableTest";
+    private static final byte[] CF = "F".getBytes();
+    private static final byte[] QN = "C".getBytes();
+    private static final int N_ROWS = 10000;
+    private static final int CELL_SIZE = 128 * 1024; // 128 KB
+    private static final double DFT_HIT_RATIO = 0.3;
+    private static final double DFT_INDEX_RATIO = 0.1;
+    private static final int ROUND = 3;
+
+    public static void main(String[] args) throws IOException {
+        double hitRatio = DFT_HIT_RATIO;
+        try {
+            hitRatio = Double.parseDouble(args[0]);
+        } catch (Exception e) {
+            // nevermind
+        }
+
+        double indexRatio = DFT_INDEX_RATIO;
+        try {
+            indexRatio = Double.parseDouble(args[1]);
+        } catch (Exception e) {
+            // nevermind
+        }
+
+        testGridTable(hitRatio, indexRatio);
+    }
+
+    public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
+        System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
+        String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
+
+        HConnection conn = HBaseConnection.get(hbaseUrl);
+        createHTableIfNeeded(conn, TEST_TABLE);
+        prepareData(conn);
+
+        Hits hits = new Hits(N_ROWS, hitRatio, indexRatio);
+
+        for (int i = 0; i < ROUND; i++) {
+            System.out.println("==================================== ROUND " + (i + 1) + " ========================================");
+            testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex());
+            testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex());
+            testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex());
+            testColumnScan(conn, hits.getHitsForColumnScan());
+        }
+
+    }
+
+    private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+        Stats stats = new Stats("COLUMN_SCAN");
+
+        HTableInterface table = conn.getTable(TEST_TABLE);
+        try {
+            stats.markStart();
+
+            int nLogicCols = colScans.size();
+            int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();
+
+            Scan[] scans = new Scan[nLogicCols];
+            ResultScanner[] scanners = new ResultScanner[nLogicCols];
+            for (int i = 0; i < nLogicCols; i++) {
+                scans[i] = new Scan();
+                scans[i].addFamily(CF);
+                scanners[i] = table.getScanner(scans[i]);
+            }
+            for (int i = 0; i < nLogicRows; i++) {
+                for (int c = 0; c < nLogicCols; c++) {
+                    Result r = scanners[c].next();
+                    stats.consume(r);
+                }
+                dot(i, nLogicRows);
+            }
+
+            stats.markEnd();
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+        fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
+    }
+
+    private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+        jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
+    }
+
+    private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+        jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
+    }
+
+    private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+        HTableInterface table = conn.getTable(TEST_TABLE);
+        try {
+            stats.markStart();
+
+            Scan scan = new Scan();
+            scan.addFamily(CF);
+            ResultScanner scanner = table.getScanner(scan);
+            int i = 0;
+            for (Result r : scanner) {
+                if (hits[i])
+                    stats.consume(r);
+                dot(i, N_ROWS);
+                i++;
+            }
+
+            stats.markEnd();
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+
+        final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
+
+        HTableInterface table = conn.getTable(TEST_TABLE);
+        try {
+
+            stats.markStart();
+
+            int i = 0;
+            while (i < N_ROWS) {
+                int start, end;
+                for (start = i; start < N_ROWS; start++) {
+                    if (hits[start])
+                        break;
+                }
+                for (end = start + 1; end < N_ROWS; end++) {
+                    boolean isEnd = true;
+                    for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++)
+                        if (hits[end + j])
+                            isEnd = false;
+                    if (isEnd)
+                        break;
+                }
+
+                if (start < N_ROWS) {
+                    Scan scan = new Scan();
+                    scan.setStartRow(Bytes.toBytes(start));
+                    scan.setStopRow(Bytes.toBytes(end));
+                    scan.addFamily(CF);
+                    ResultScanner scanner = table.getScanner(scan);
+                    i = start;
+                    for (Result r : scanner) {
+                        stats.consume(r);
+                        dot(i, N_ROWS);
+                        i++;
+                    }
+                }
+                i = end;
+            }
+
+            stats.markEnd();
+
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    private static void prepareData(HConnection conn) throws IOException {
+        HTableInterface table = conn.getTable(TEST_TABLE);
+
+        try {
+            // check how many rows existing
+            int nRows = 0;
+            Scan scan = new Scan();
+            scan.setFilter(new KeyOnlyFilter());
+            ResultScanner scanner = table.getScanner(scan);
+            for (Result r : scanner) {
+                r.getRow(); // nothing to do
+                nRows++;
+            }
+
+            if (nRows > 0) {
+                System.out.println(nRows + " existing rows");
+                if (nRows != N_ROWS)
+                    throw new IOException("Expect " + N_ROWS + " rows but it is not");
+                return;
+            }
+
+            // insert rows into empty table
+            System.out.println("Writing " + N_ROWS + " rows to " + TEST_TABLE);
+            long nBytes = 0;
+            for (int i = 0; i < N_ROWS; i++) {
+                byte[] rowkey = Bytes.toBytes(i);
+                Put put = new Put(rowkey);
+                byte[] cell = randomBytes();
+                put.add(CF, QN, cell);
+                table.put(put);
+                nBytes += cell.length;
+                dot(i, N_ROWS);
+            }
+            System.out.println();
+            System.out.println("Written " + N_ROWS + " rows, " + nBytes + " bytes");
+
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+
+    }
+
+    private static void dot(int i, int nRows) {
+        if (i % (nRows / 100) == 0)
+            System.out.print(".");
+    }
+
+    private static byte[] randomBytes() {
+        byte[] bytes = new byte[CELL_SIZE];
+        Random rand = new Random();
+        rand.nextBytes(bytes);
+        return bytes;
+    }
+
+    private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
+        HBaseAdmin hbase = new HBaseAdmin(conn);
+
+        try {
+            boolean tableExist = false;
+            try {
+                hbase.getTableDescriptor(TableName.valueOf(tableName));
+                tableExist = true;
+            } catch (TableNotFoundException e) {
+            }
+
+            if (tableExist) {
+                System.out.println("HTable '" + tableName + "' already exists");
+                return;
+            }
+
+            System.out.println("Creating HTable '" + tableName + "'");
+
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+            HColumnDescriptor fd = new HColumnDescriptor(CF);
+            fd.setBlocksize(CELL_SIZE);
+            desc.addFamily(fd);
+            hbase.createTable(desc);
+
+            System.out.println("HTable '" + tableName + "' created");
+        } finally {
+            hbase.close();
+        }
+    }
+
+    static class Hits {
+
+        boolean[] hitsForRowScanWithIndex;
+        boolean[] hitsForRowScanNoIndex;
+        List<Pair<Integer, Integer>> hitsForColumnScan;
+
+        public Hits(int nRows, double hitRatio, double indexRatio) {
+            Random rand = new Random();
+
+            hitsForRowScanWithIndex = new boolean[nRows];
+            hitsForRowScanNoIndex = new boolean[nRows];
+
+            // for row scan
+            int blockSize = (int) (1.0 / indexRatio);
+            int nBlocks = nRows / blockSize;
+
+            for (int i = 0; i < nBlocks; i++) {
+
+                if (rand.nextDouble() < hitRatio) {
+                    for (int j = 0; j < blockSize; j++) {
+                        hitsForRowScanNoIndex[i * blockSize + j] = true;
+                        hitsForRowScanWithIndex[i * blockSize + j] = true;
+                    }
+                } else {
+                    // case of not hit
+                    hitsForRowScanNoIndex[i * blockSize] = true;
+                }
+            }
+
+            hitsForColumnScan = Lists.newArrayList();
+
+            // for column scan
+            int nColumns = 20;
+            int logicRows = nRows / nColumns;
+            for (int i = 0; i < nColumns; i++) {
+                if (rand.nextDouble() < hitRatio) {
+                    hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows));
+                }
+            }
+
+        }
+
+        public boolean[] getHitsForRowScanWithIndex() {
+            return hitsForRowScanWithIndex;
+        }
+
+        public boolean[] getHitsForRowScanNoIndex() {
+            return hitsForRowScanNoIndex;
+        }
+
+        public List<Pair<Integer, Integer>> getHitsForColumnScan() {
+            return hitsForColumnScan;
+        }
+    }
+
+    static class Stats {
+        String name;
+        long startTime;
+        long endTime;
+        long rowsRead;
+        long bytesRead;
+
+        public Stats(String name) {
+            this.name = name;
+        }
+
+        public void consume(Result r) {
+            consume(r, Integer.MAX_VALUE);
+        }
+
+        private void consume(Result r, int nBytesToConsume) {
+            Cell cell = r.getColumnLatestCell(CF, QN);
+            byte mix = 0;
+            byte[] valueArray = cell.getValueArray();
+            int n = Math.min(nBytesToConsume, cell.getValueLength());
+            for (int i = 0; i < n; i++) {
+                mix ^= valueArray[i];
+                bytesRead++;
+            }
+            discard(mix);
+            rowsRead++;
+        }
+
+        private void discard(byte n) {
+            // do nothing
+        }
+
+        public void markStart() {
+            System.out.println(name + " starts");
+            startTime = System.currentTimeMillis();
+        }
+
+        public void markEnd() {
+            endTime = System.currentTimeMillis();
+            System.out.println();
+            System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
new file mode 100644
index 0000000..adf4419
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
@@ -0,0 +1,237 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class HbaseStreamingInput {
+    private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
+
+    private static final int CELL_SIZE = 128 * 1024; // 128 KB
+    private static final byte[] CF = "F".getBytes();
+    private static final byte[] QN = "C".getBytes();
+
+    public static void createTable(String tableName) throws IOException {
+        HConnection conn = getConnection();
+        HBaseAdmin hadmin = new HBaseAdmin(conn);
+
+        try {
+            boolean tableExist = hadmin.tableExists(tableName);
+            if (tableExist) {
+                logger.info("HTable '" + tableName + "' already exists");
+                return;
+            }
+
+            logger.info("Creating HTable '" + tableName + "'");
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+            desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+            desc.setMemStoreFlushSize(512 << 20);//512M
+
+            HColumnDescriptor fd = new HColumnDescriptor(CF);
+            fd.setBlocksize(CELL_SIZE);
+            desc.addFamily(fd);
+            hadmin.createTable(desc);
+
+            logger.info("HTable '" + tableName + "' created");
+        } finally {
+            conn.close();
+            hadmin.close();
+        }
+    }
+
+    private static void scheduleJob(Semaphore semaphore, int interval) {
+        while (true) {
+            semaphore.release();
+            try {
+                Thread.sleep(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static void addData(String tableName) throws IOException {
+
+        createTable(tableName);
+
+        final Semaphore semaphore = new Semaphore(0);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                scheduleJob(semaphore, 300000);//5 minutes a batch
+            }
+        }).start();
+
+        while (true) {
+            try {
+                semaphore.acquire();
+                int waiting = semaphore.availablePermits();
+                if (waiting > 0) {
+                    logger.warn("There are another " + waiting + " batches waiting to be added");
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            HConnection conn = getConnection();
+            HTableInterface table = conn.getTable(tableName);
+
+            byte[] key = new byte[8 + 4];//time + id
+
+            logger.info("============================================");
+            long startTime = System.currentTimeMillis();
+            logger.info("data load start time in millis: " + startTime);
+            logger.info("data load start at " + formatTime(startTime));
+            List<Put> buffer = Lists.newArrayList();
+            for (int i = 0; i < (1 << 10); ++i) {
+                long time = System.currentTimeMillis();
+                Bytes.putLong(key, 0, time);
+                Bytes.putInt(key, 8, i);
+                Put put = new Put(key);
+                byte[] cell = randomBytes(CELL_SIZE);
+                put.add(CF, QN, cell);
+                buffer.add(put);
+            }
+            table.put(buffer);
+            table.close();
+            conn.close();
+            long endTime = System.currentTimeMillis();
+            logger.info("data load end at " + formatTime(endTime));
+            logger.info("data load time consumed: " + (endTime - startTime));
+            logger.info("============================================");
+        }
+    }
+
+    public static void randomScan(String tableName) throws IOException {
+
+        final Semaphore semaphore = new Semaphore(0);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                scheduleJob(semaphore, 60000);//1 minutes a batch
+            }
+        }).start();
+
+        while (true) {
+            try {
+                semaphore.acquire();
+                int waiting = semaphore.drainPermits();
+                if (waiting > 0) {
+                    logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            Random r = new Random();
+            HConnection conn = getConnection();
+            HTableInterface table = conn.getTable(tableName);
+
+            long leftBound = getFirstKeyTime(table);
+            long rightBound = System.currentTimeMillis();
+
+            for (int t = 0; t < 5; ++t) {
+                long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
+                long end = start + 600000;//a period of 10 minutes
+                logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
+
+                Scan scan = new Scan();
+                scan.setStartRow(Bytes.toBytes(start));
+                scan.setStopRow(Bytes.toBytes(end));
+                scan.addFamily(CF);
+                ResultScanner scanner = table.getScanner(scan);
+                long hash = 0;
+                int rowCount = 0;
+                for (Result result : scanner) {
+                    Cell cell = result.getColumnLatestCell(CF, QN);
+                    byte[] value = cell.getValueArray();
+                    if (cell.getValueLength() != CELL_SIZE) {
+                        logger.error("value size invalid!!!!!");
+                    }
+
+                    hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
+                    rowCount++;
+                }
+                scanner.close();
+                logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
+            }
+            table.close();
+            conn.close();
+        }
+    }
+
+    private static long getFirstKeyTime(HTableInterface table) throws IOException {
+        long startTime = 0;
+
+        Scan scan = new Scan();
+        scan.addFamily(CF);
+        ResultScanner scanner = table.getScanner(scan);
+        for (Result result : scanner) {
+            Cell cell = result.getColumnLatestCell(CF, QN);
+            byte[] key = cell.getRowArray();
+            startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
+            logger.info("Retrieved first record time: " + formatTime(startTime));
+            break;//only get first one
+        }
+        scanner.close();
+        return startTime;
+
+    }
+
+    private static HConnection getConnection() throws IOException {
+        return HConnectionManager.createConnection(HBaseConfiguration.create());
+    }
+
+    private static String formatTime(long time) {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(time);
+        return dateFormat.format(cal.getTime());
+    }
+
+    private static byte[] randomBytes(int lenth) {
+        byte[] bytes = new byte[lenth];
+        Random rand = new Random();
+        rand.nextBytes(bytes);
+        return bytes;
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        if (args[0].equalsIgnoreCase("createtable")) {
+            createTable(args[1]);
+        } else if (args[0].equalsIgnoreCase("adddata")) {
+            addData(args[1]);
+        } else if (args[0].equalsIgnoreCase("randomscan")) {
+            randomScan(args[1]);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
new file mode 100644
index 0000000..239adcf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+@SuppressWarnings("static-access")
+public class HtableAlterMetadataCLI extends AbstractHadoopJob {
+
+    private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key");
+    private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value");
+
+    protected static final Logger logger = LoggerFactory.getLogger(HtableAlterMetadataCLI.class);
+
+    String tableName;
+    String metadataKey;
+    String metadataValue;
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        try {
+            options.addOption(OPTION_HTABLE_NAME);
+            options.addOption(OPTION_METADATA_KEY);
+            options.addOption(OPTION_METADATA_VALUE);
+
+            parseOptions(options, args);
+            tableName = getOptionValue(OPTION_HTABLE_NAME);
+            metadataKey = getOptionValue(OPTION_METADATA_KEY);
+            metadataValue = getOptionValue(OPTION_METADATA_VALUE);
+
+            alter();
+
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    private void alter() throws IOException {
+        Configuration conf = HBaseConfiguration.create();
+        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+
+        hbaseAdmin.disableTable(table.getTableName());
+        table.setValue(metadataKey, metadataValue);
+        hbaseAdmin.modifyTable(table.getTableName(), table);
+        hbaseAdmin.enableTable(table.getTableName());
+        hbaseAdmin.close();
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new HtableAlterMetadataCLI(), args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
new file mode 100644
index 0000000..f0618c9
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class OrphanHBaseCleanJob extends AbstractHadoopJob {
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true).withDescription("metadata store whitelist, separated with comma").create("whitelist");
+
+    protected static final Logger logger = LoggerFactory.getLogger(OrphanHBaseCleanJob.class);
+
+    boolean delete = false;
+    Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        logger.info("jobs args: " + Arrays.toString(args));
+        try {
+            options.addOption(OPTION_DELETE);
+            options.addOption(OPTION_WHITELIST);
+            parseOptions(options, args);
+
+            logger.info("options: '" + getOptionsAsString() + "'");
+            logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
+            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+            String[] metastoreWhitelist = getOptionValue(OPTION_WHITELIST).split(",");
+
+            for (String ms : metastoreWhitelist) {
+                logger.info("metadata store in white list: " + ms);
+                metastoreWhitelistSet.add(ms);
+            }
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+
+            cleanUnusedHBaseTables(conf);
+
+            return 0;
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+            throw e;
+        }
+    }
+
+    private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
+
+        // get all kylin hbase tables
+        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
+        HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+        List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+        for (HTableDescriptor desc : tableDescriptors) {
+            String host = desc.getValue(IRealizationConstants.HTableTag);
+            if (!metastoreWhitelistSet.contains(host)) {
+                logger.info("HTable {} is recognized as orphan because its tag is {}", desc.getTableName(), host);
+                //collect orphans
+                allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+            } else {
+                logger.info("HTable {} belongs to {}", desc.getTableName(), host);
+            }
+        }
+
+        if (delete == true) {
+            // drop tables
+            for (String htableName : allTablesNeedToBeDropped) {
+                logger.info("Deleting HBase table " + htableName);
+                if (hbaseAdmin.tableExists(htableName)) {
+                    if (hbaseAdmin.isTableEnabled(htableName)) {
+                        hbaseAdmin.disableTable(htableName);
+                    }
+
+                    hbaseAdmin.deleteTable(htableName);
+                    logger.info("Deleted HBase table " + htableName);
+                } else {
+                    logger.info("HBase table" + htableName + " does not exist");
+                }
+            }
+        } else {
+            System.out.println("--------------- Tables To Be Dropped ---------------");
+            for (String htableName : allTablesNeedToBeDropped) {
+                System.out.println(htableName);
+            }
+            System.out.println("----------------------------------------------------");
+        }
+
+        hbaseAdmin.close();
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new OrphanHBaseCleanJob(), args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
new file mode 100644
index 0000000..9cd6d23
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -0,0 +1,71 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class RowCounterCLI {
+    private static final Logger logger = LoggerFactory.getLogger(RowCounterCLI.class);
+
+    public static void main(String[] args) throws IOException {
+
+        if (args == null || args.length != 3) {
+            System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]");
+        }
+
+        System.out.println(args[0]);
+        String htableName = args[0];
+        System.out.println(args[1]);
+        byte[] startKey = BytesUtil.fromReadableText(args[1]);
+        System.out.println(args[2]);
+        byte[] endKey = BytesUtil.fromReadableText(args[2]);
+
+        if (startKey == null) {
+            System.out.println("startkey is null ");
+        } else {
+            System.out.println("startkey lenght: " + startKey.length);
+        }
+
+        System.out.println("start key in binary: " + Bytes.toStringBinary(startKey));
+        System.out.println("end key in binary: " + Bytes.toStringBinary(endKey));
+
+        Configuration conf = HBaseConfiguration.create();
+
+        Scan scan = new Scan();
+        scan.setCaching(1024);
+        scan.setCacheBlocks(true);
+        scan.setStartRow(startKey);
+        scan.setStopRow(endKey);
+
+        logger.info("My Scan " + scan.toString());
+
+        HConnection conn = HConnectionManager.createConnection(conf);
+        HTableInterface tableInterface = conn.getTable(htableName);
+
+        Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
+        int counter = 0;
+        while (iterator.hasNext()) {
+            iterator.next();
+            counter++;
+            if (counter % 1000 == 1) {
+                System.out.println("number of rows: " + counter);
+            }
+        }
+        System.out.println("number of rows: " + counter);
+        tableInterface.close();
+        conn.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
new file mode 100644
index 0000000..490c580
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageCleanupJob extends AbstractHadoopJob {
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+
+    protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
+
+    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
+
+    boolean delete = false;
+
+    protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+     */
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        logger.info("jobs args: " + Arrays.toString(args));
+        try {
+            options.addOption(OPTION_DELETE);
+            parseOptions(options, args);
+
+            logger.info("options: '" + getOptionsAsString() + "'");
+            logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
+            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+
+            cleanUnusedIntermediateHiveTable(conf);
+            cleanUnusedHdfsFiles(conf);
+            cleanUnusedHBaseTables(conf);
+
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        // get all kylin hbase tables
+        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
+        HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+        List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+        for (HTableDescriptor desc : tableDescriptors) {
+            String host = desc.getValue(IRealizationConstants.HTableTag);
+            String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
+            if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+                //only take care htables that belongs to self, and created more than 2 days
+                if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
+                    allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+                }
+            }
+        }
+
+        // remove every segment htable from drop list
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments()) {
+                String tablename = seg.getStorageLocationIdentifier();
+                if (allTablesNeedToBeDropped.contains(tablename)) {
+                    allTablesNeedToBeDropped.remove(tablename);
+                    logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+                }
+            }
+        }
+
+        // remove every ii segment htable from drop list
+        for (IIInstance ii : iiManager.listAllIIs()) {
+            for (IISegment seg : ii.getSegments()) {
+                String tablename = seg.getStorageLocationIdentifier();
+
+                if (allTablesNeedToBeDropped.contains(tablename)) {
+                    allTablesNeedToBeDropped.remove(tablename);
+                    logger.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+                }
+            }
+        }
+
+        if (delete == true) {
+            // drop tables
+            for (String htableName : allTablesNeedToBeDropped) {
+                logger.info("Deleting HBase table " + htableName);
+                if (hbaseAdmin.tableExists(htableName)) {
+                    if (hbaseAdmin.isTableEnabled(htableName)) {
+                        hbaseAdmin.disableTable(htableName);
+                    }
+
+                    hbaseAdmin.deleteTable(htableName);
+                    logger.info("Deleted HBase table " + htableName);
+                } else {
+                    logger.info("HBase table" + htableName + " does not exist");
+                }
+            }
+        } else {
+            System.out.println("--------------- Tables To Be Dropped ---------------");
+            for (String htableName : allTablesNeedToBeDropped) {
+                System.out.println(htableName);
+            }
+            System.out.println("----------------------------------------------------");
+        }
+
+        hbaseAdmin.close();
+    }
+
+    private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
+        JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        FileSystem fs = FileSystem.get(conf);
+        List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
+        // GlobFilter filter = new
+        // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+        // + "/kylin-.*");
+        FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
+        for (FileStatus status : fStatus) {
+            String path = status.getPath().getName();
+            // System.out.println(path);
+            if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
+                String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
+                allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
+            }
+        }
+
+        List<String> allJobs = executableManager.getAllJobIds();
+        for (String jobId : allJobs) {
+            // only remove FINISHED and DISCARDED job intermediate files
+            final ExecutableState state = executableManager.getOutput(jobId).getState();
+            if (!state.isFinalState()) {
+                String path = JobInstance.getJobWorkingDir(jobId, engineConfig.getHdfsWorkingDirectory());
+                allHdfsPathsNeedToBeDeleted.remove(path);
+                logger.info("Remove " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state);
+            }
+        }
+
+        // remove every segment working dir from deletion list
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments()) {
+                String jobUuid = seg.getLastBuildJobID();
+                if (jobUuid != null && jobUuid.equals("") == false) {
+                    String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
+                    allHdfsPathsNeedToBeDeleted.remove(path);
+                    logger.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
+                }
+            }
+        }
+
+        if (delete == true) {
+            // remove files
+            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+                logger.info("Deleting hdfs path " + hdfsPath);
+                Path p = new Path(hdfsPath);
+                if (fs.exists(p) == true) {
+                    fs.delete(p, true);
+                    logger.info("Deleted hdfs path " + hdfsPath);
+                } else {
+                    logger.info("Hdfs path " + hdfsPath + "does not exist");
+                }
+            }
+        } else {
+            System.out.println("--------------- HDFS Path To Be Deleted ---------------");
+            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+                System.out.println(hdfsPath);
+            }
+            System.out.println("-------------------------------------------------------");
+        }
+
+    }
+
+    private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
+        final int uuidLength = 36;
+        
+        final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+        StringBuilder buf = new StringBuilder();
+        buf.append("hive -e \"");
+        buf.append(useDatabaseHql);
+        buf.append("show tables " + "\'kylin_intermediate_*\'" + "; ");
+        buf.append("\"");
+        
+        Pair<Integer, String> result = cmdExec.execute(buf.toString());
+
+        String outputStr = result.getSecond();
+        BufferedReader reader = new BufferedReader(new StringReader(outputStr));
+        String line = null;
+        List<String> allJobs = executableManager.getAllJobIds();
+        List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
+        List<String> workingJobList = new ArrayList<String>();
+
+        for (String jobId : allJobs) {
+            // only remove FINISHED and DISCARDED job intermediate table
+            final ExecutableState state = executableManager.getOutput(jobId).getState();
+
+            if (!state.isFinalState()) {
+                workingJobList.add(jobId);
+                logger.info("Remove intermediate hive table with job id " + jobId + " with job status " + state);
+            }
+        }
+
+        while ((line = reader.readLine()) != null) {
+            if (line.startsWith("kylin_intermediate_")) {
+                boolean isNeedDel = false;
+                String uuid = line.substring(line.length() - uuidLength, line.length());
+                uuid = uuid.replace("_", "-");
+                //Check whether it's a hive table in use
+                if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
+                    isNeedDel = true;
+                }
+
+                if (isNeedDel) {
+                    allHiveTablesNeedToBeDeleted.add(line);
+                }
+            }
+        }
+
+        if (delete == true) {
+            buf.delete(0, buf.length());
+            buf.append("hive -e \"");
+            buf.append(useDatabaseHql);
+            for (String delHive : allHiveTablesNeedToBeDeleted) {
+                buf.append("drop table if exists " + delHive + "; ");
+                logger.info("Remove " + delHive + " from hive tables.");
+            }
+            buf.append("\"");
+            
+            try {
+                cmdExec.execute(buf.toString());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
+            for (String hiveTable : allHiveTablesNeedToBeDeleted) {
+                System.out.println(hiveTable);
+            }
+            System.out.println("----------------------------------------------------");
+        }
+
+        if (reader != null)
+            reader.close();
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
new file mode 100644
index 0000000..f0c4c5b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+public class TarGZUtil {
+
+    public static void uncompressTarGZ(File tarFile, File dest) throws IOException {
+        dest.mkdir();
+        TarArchiveInputStream tarIn = null;
+
+        tarIn = new TarArchiveInputStream(new GzipCompressorInputStream(new BufferedInputStream(new FileInputStream(tarFile))));
+
+        TarArchiveEntry tarEntry = tarIn.getNextTarEntry();
+        // tarIn is a TarArchiveInputStream
+        while (tarEntry != null) {// create a file with the same name as the tarEntry
+            File destPath = new File(dest, tarEntry.getName());
+            System.out.println("working: " + destPath.getCanonicalPath());
+            if (tarEntry.isDirectory()) {
+                destPath.mkdirs();
+            } else {
+                destPath.createNewFile();
+                //byte [] btoRead = new byte[(int)tarEntry.getSize()];
+                byte[] btoRead = new byte[1024];
+                //FileInputStream fin 
+                //  = new FileInputStream(destPath.getCanonicalPath());
+                BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream(destPath));
+                int len = 0;
+
+                while ((len = tarIn.read(btoRead)) != -1) {
+                    bout.write(btoRead, 0, len);
+                }
+
+                bout.close();
+                btoRead = null;
+
+            }
+            tarEntry = tarIn.getNextTarEntry();
+        }
+        tarIn.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
new file mode 100644
index 0000000..7b9831a
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
@@ -0,0 +1,69 @@
+package org.apache.kylin.job.monitor;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+    public static void main(String[] args) {
+        Preconditions.checkArgument(args[0].equals("monitor"));
+
+        int i = 1;
+        List<String> receivers = null;
+        String host = null;
+        String tableName = null;
+        String authorization = null;
+        String cubeName = null;
+        String projectName = "default";
+        while (i < args.length) {
+            String argName = args[i];
+            switch (argName) {
+            case "-receivers":
+                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+                break;
+            case "-host":
+                host = args[++i];
+                break;
+            case "-tableName":
+                tableName = args[++i];
+                break;
+            case "-authorization":
+                authorization = args[++i];
+                break;
+            case "-cubeName":
+                cubeName = args[++i];
+                break;
+            case "-projectName":
+                projectName = args[++i];
+                break;
+            default:
+                throw new RuntimeException("invalid argName:" + argName);
+            }
+            i++;
+        }
+        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+        final StreamingMonitor streamingMonitor = new StreamingMonitor();
+        if (tableName != null) {
+            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+            Preconditions.checkNotNull(host);
+            Preconditions.checkNotNull(authorization);
+            Preconditions.checkNotNull(tableName);
+            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
+        }
+        if (cubeName != null) {
+            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+            streamingMonitor.checkCube(receivers, cubeName,host);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..e23f065
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.job.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
+        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+        StringBuilder stringBuilder = new StringBuilder();
+        String url = host + "/kylin/api/query";
+        PostMethod request = new PostMethod(url);
+        try {
+
+            request.addRequestHeader("Authorization", "Basic " + authorization);
+            request.addRequestHeader("Content-Type", "application/json");
+            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
+            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+            int statusCode = new HttpClient().executeMethod(request);
+            String msg = Bytes.toString(request.getResponseBody());
+            stringBuilder.append("host:").append(host).append("\n");
+            stringBuilder.append("query:").append(query).append("\n");
+            stringBuilder.append("statusCode:").append(statusCode).append("\n");
+            if (statusCode == 200) {
+                title += "succeed";
+                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+            } else {
+                title += "failed";
+                stringBuilder.append("response:").append(msg).append("\n");
+            }
+        } catch (Exception e) {
+            final StringWriter out = new StringWriter();
+            e.printStackTrace(new PrintWriter(out));
+            title += "failed";
+            stringBuilder.append(out.toString());
+        } finally {
+            request.releaseConnection();
+        }
+        logger.info("title:" + title);
+        logger.info("content:" + stringBuilder.toString());
+        sendMail(receivers, title, stringBuilder.toString());
+    }
+
+    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+            }
+        }
+        return gaps;
+    }
+
+    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        Preconditions.checkNotNull(cube);
+        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+        logger.info("totally " + segments.size() + " cubeSegments");
+        Collections.sort(segments);
+        return segments;
+    }
+
+    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else {
+                overlaps.add(Pair.newPair(first.getName(), second.getName()));
+            }
+        }
+        return overlaps;
+    }
+
+    public void checkCube(List<String> receivers, String cubeName, String host) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        StringBuilder content = new StringBuilder();
+        if (!gaps.isEmpty()) {
+            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+                @Nullable
+                @Override
+                public String apply(Pair<Long, Long> input) {
+                    return parseInterval(input);
+                }
+            }), "\n")).append("\n");
+        }
+        if (!overlaps.isEmpty()) {
+            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+        }
+        if (content.length() > 0) {
+            logger.info(content.toString());
+            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
+        } else {
+            logger.info("no gaps or overlaps");
+        }
+    }
+
+    private String parseInterval(Pair<Long, Long> interval) {
+        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+    }
+
+    private void sendMail(List<String> receivers, String title, String content) {
+        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+        mailService.sendMail(receivers, title, content, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
new file mode 100644
index 0000000..029d4d2
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.job.streaming;
+
+/**
+ */
+public class BootstrapConfig {
+
+    private String streaming;
+    private int partitionId = -1;
+
+    //one off default value set to true
+    private boolean oneOff = true;
+    private long start = 0L;
+    private long end = 0L; 
+    private long margin = 0L;
+    
+
+    private boolean fillGap;
+
+    public long getMargin() {
+        return margin;
+    }
+
+    public void setMargin(long margin) {
+        this.margin = margin;
+    }
+
+    public boolean isOneOff() {
+        return oneOff;
+    }
+
+    public void setOneOff(boolean oneOff) {
+        this.oneOff = oneOff;
+    }
+
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    public String getStreaming() {
+        return streaming;
+    }
+
+    public void setStreaming(String streaming) {
+        this.streaming = streaming;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public boolean isFillGap() {
+        return fillGap;
+    }
+
+    public void setFillGap(boolean fillGap) {
+        this.fillGap = fillGap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
new file mode 100644
index 0000000..38787a8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.job.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.HBaseCuboidWriter;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CubeStreamConsumer implements MicroStreamBatchConsumer {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumer.class);
+
+    private final CubeManager cubeManager;
+    private final String cubeName;
+    private final KylinConfig kylinConfig;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    private int totalConsumedMessageCount = 0;
+    private int totalRawMessageCount = 0;
+
+    public CubeStreamConsumer(String cubeName) {
+        this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.cubeManager = CubeManager.getInstance(kylinConfig);
+        this.cubeName = cubeName;
+    }
+
+    @Override
+    public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+
+        totalConsumedMessageCount += microStreamBatch.size();
+        totalRawMessageCount += microStreamBatch.getRawMessageCount();
+
+        final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+        long startOffset = microStreamBatch.getOffset().getFirst();
+        long endOffset = microStreamBatch.getOffset().getSecond();
+        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+        blockingQueue.put(Collections.<String> emptyList());
+
+        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getFirst(), microStreamBatch.getTimestamp().getSecond(), false, false);
+        long start = System.currentTimeMillis();
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+        logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
+
+        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+        final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
+        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+        FSDataInputStream localStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION));
+        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), localStream, System.currentTimeMillis());
+        localStream.close();
+        FileSystem.getLocal(conf).delete(outputPath, true);
+
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = CubingUtils.buildDictionary(cubeInstance, parsedStreamMessages);
+        Map<TblColRef, Dictionary<?>> realDictMap = CubingUtils.writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), realDictMap);
+        final HTableInterface hTable = createHTable(cubeSegment);
+        final HBaseCuboidWriter gtRecordWriter = new HBaseCuboidWriter(cubeDesc, hTable);
+
+        executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
+        gtRecordWriter.flush();
+        hTable.close();
+        commitSegment(cubeSegment);
+
+        logger.info("Consumed {} messages out of {} raw messages", totalConsumedMessageCount, totalRawMessageCount);
+    }
+
+    //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
+    private void commitSegment(CubeSegment cubeSegment) throws IOException {
+        cubeSegment.setStatus(SegmentStatusEnum.READY);
+        CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
+        cubeBuilder.setToAddSegs(cubeSegment);
+        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+    }
+
+    private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+        final String hTableName = cubeSegment.getStorageLocationIdentifier();
+        CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
+        final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+        logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+        return hTable;
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..95fbc9d
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.StreamingConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+
+        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+            @Nullable
+            @Override
+            public String apply(BrokerConfig brokerConfig) {
+                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+            }
+        }), ",");
+        Properties props = new Properties();
+        props.put("metadata.broker.list", brokerList);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, String>(config);
+
+        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
+        for (int i = 0; i < messages.size(); ++i) {
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+            keyedMessages.add(keyedMessage);
+        }
+        producer.send(keyedMessages);
+        producer.close();
+    }
+
+}