You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/16 05:05:16 UTC
[2/5] incubator-kylin git commit: KYLIN-1010 Job module with only II
and tests left
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
new file mode 100644
index 0000000..826ce0a
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+
+import com.google.common.collect.Lists;
+
+public class GridTableHBaseBenchmark {
+
+ private static final String TEST_TABLE = "GridTableTest";
+ private static final byte[] CF = "F".getBytes();
+ private static final byte[] QN = "C".getBytes();
+ private static final int N_ROWS = 10000;
+ private static final int CELL_SIZE = 128 * 1024; // 128 KB
+ private static final double DFT_HIT_RATIO = 0.3;
+ private static final double DFT_INDEX_RATIO = 0.1;
+ private static final int ROUND = 3;
+
+ public static void main(String[] args) throws IOException {
+ double hitRatio = DFT_HIT_RATIO;
+ try {
+ hitRatio = Double.parseDouble(args[0]);
+ } catch (Exception e) {
+ // nevermind
+ }
+
+ double indexRatio = DFT_INDEX_RATIO;
+ try {
+ indexRatio = Double.parseDouble(args[1]);
+ } catch (Exception e) {
+ // nevermind
+ }
+
+ testGridTable(hitRatio, indexRatio);
+ }
+
+ public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
+ System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
+ String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
+
+ HConnection conn = HBaseConnection.get(hbaseUrl);
+ createHTableIfNeeded(conn, TEST_TABLE);
+ prepareData(conn);
+
+ Hits hits = new Hits(N_ROWS, hitRatio, indexRatio);
+
+ for (int i = 0; i < ROUND; i++) {
+ System.out.println("==================================== ROUND " + (i + 1) + " ========================================");
+ testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex());
+ testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex());
+ testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex());
+ testColumnScan(conn, hits.getHitsForColumnScan());
+ }
+
+ }
+
+ private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+ Stats stats = new Stats("COLUMN_SCAN");
+
+ HTableInterface table = conn.getTable(TEST_TABLE);
+ try {
+ stats.markStart();
+
+ int nLogicCols = colScans.size();
+ int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();
+
+ Scan[] scans = new Scan[nLogicCols];
+ ResultScanner[] scanners = new ResultScanner[nLogicCols];
+ for (int i = 0; i < nLogicCols; i++) {
+ scans[i] = new Scan();
+ scans[i].addFamily(CF);
+ scanners[i] = table.getScanner(scans[i]);
+ }
+ for (int i = 0; i < nLogicRows; i++) {
+ for (int c = 0; c < nLogicCols; c++) {
+ Result r = scanners[c].next();
+ stats.consume(r);
+ }
+ dot(i, nLogicRows);
+ }
+
+ stats.markEnd();
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+ fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
+ }
+
+ private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+ jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
+ }
+
+ private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+ jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
+ }
+
+ private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+ HTableInterface table = conn.getTable(TEST_TABLE);
+ try {
+ stats.markStart();
+
+ Scan scan = new Scan();
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ int i = 0;
+ for (Result r : scanner) {
+ if (hits[i])
+ stats.consume(r);
+ dot(i, N_ROWS);
+ i++;
+ }
+
+ stats.markEnd();
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+
+ final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
+
+ HTableInterface table = conn.getTable(TEST_TABLE);
+ try {
+
+ stats.markStart();
+
+ int i = 0;
+ while (i < N_ROWS) {
+ int start, end;
+ for (start = i; start < N_ROWS; start++) {
+ if (hits[start])
+ break;
+ }
+ for (end = start + 1; end < N_ROWS; end++) {
+ boolean isEnd = true;
+ for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++)
+ if (hits[end + j])
+ isEnd = false;
+ if (isEnd)
+ break;
+ }
+
+ if (start < N_ROWS) {
+ Scan scan = new Scan();
+ scan.setStartRow(Bytes.toBytes(start));
+ scan.setStopRow(Bytes.toBytes(end));
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ i = start;
+ for (Result r : scanner) {
+ stats.consume(r);
+ dot(i, N_ROWS);
+ i++;
+ }
+ }
+ i = end;
+ }
+
+ stats.markEnd();
+
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ private static void prepareData(HConnection conn) throws IOException {
+ HTableInterface table = conn.getTable(TEST_TABLE);
+
+ try {
+ // check how many rows existing
+ int nRows = 0;
+ Scan scan = new Scan();
+ scan.setFilter(new KeyOnlyFilter());
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r : scanner) {
+ r.getRow(); // nothing to do
+ nRows++;
+ }
+
+ if (nRows > 0) {
+ System.out.println(nRows + " existing rows");
+ if (nRows != N_ROWS)
+ throw new IOException("Expect " + N_ROWS + " rows but it is not");
+ return;
+ }
+
+ // insert rows into empty table
+ System.out.println("Writing " + N_ROWS + " rows to " + TEST_TABLE);
+ long nBytes = 0;
+ for (int i = 0; i < N_ROWS; i++) {
+ byte[] rowkey = Bytes.toBytes(i);
+ Put put = new Put(rowkey);
+ byte[] cell = randomBytes();
+ put.add(CF, QN, cell);
+ table.put(put);
+ nBytes += cell.length;
+ dot(i, N_ROWS);
+ }
+ System.out.println();
+ System.out.println("Written " + N_ROWS + " rows, " + nBytes + " bytes");
+
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+
+ }
+
+ private static void dot(int i, int nRows) {
+ if (i % (nRows / 100) == 0)
+ System.out.print(".");
+ }
+
+ private static byte[] randomBytes() {
+ byte[] bytes = new byte[CELL_SIZE];
+ Random rand = new Random();
+ rand.nextBytes(bytes);
+ return bytes;
+ }
+
+ private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
+ HBaseAdmin hbase = new HBaseAdmin(conn);
+
+ try {
+ boolean tableExist = false;
+ try {
+ hbase.getTableDescriptor(TableName.valueOf(tableName));
+ tableExist = true;
+ } catch (TableNotFoundException e) {
+ }
+
+ if (tableExist) {
+ System.out.println("HTable '" + tableName + "' already exists");
+ return;
+ }
+
+ System.out.println("Creating HTable '" + tableName + "'");
+
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+ HColumnDescriptor fd = new HColumnDescriptor(CF);
+ fd.setBlocksize(CELL_SIZE);
+ desc.addFamily(fd);
+ hbase.createTable(desc);
+
+ System.out.println("HTable '" + tableName + "' created");
+ } finally {
+ hbase.close();
+ }
+ }
+
+ static class Hits {
+
+ boolean[] hitsForRowScanWithIndex;
+ boolean[] hitsForRowScanNoIndex;
+ List<Pair<Integer, Integer>> hitsForColumnScan;
+
+ public Hits(int nRows, double hitRatio, double indexRatio) {
+ Random rand = new Random();
+
+ hitsForRowScanWithIndex = new boolean[nRows];
+ hitsForRowScanNoIndex = new boolean[nRows];
+
+ // for row scan
+ int blockSize = (int) (1.0 / indexRatio);
+ int nBlocks = nRows / blockSize;
+
+ for (int i = 0; i < nBlocks; i++) {
+
+ if (rand.nextDouble() < hitRatio) {
+ for (int j = 0; j < blockSize; j++) {
+ hitsForRowScanNoIndex[i * blockSize + j] = true;
+ hitsForRowScanWithIndex[i * blockSize + j] = true;
+ }
+ } else {
+ // case of not hit
+ hitsForRowScanNoIndex[i * blockSize] = true;
+ }
+ }
+
+ hitsForColumnScan = Lists.newArrayList();
+
+ // for column scan
+ int nColumns = 20;
+ int logicRows = nRows / nColumns;
+ for (int i = 0; i < nColumns; i++) {
+ if (rand.nextDouble() < hitRatio) {
+ hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows));
+ }
+ }
+
+ }
+
+ public boolean[] getHitsForRowScanWithIndex() {
+ return hitsForRowScanWithIndex;
+ }
+
+ public boolean[] getHitsForRowScanNoIndex() {
+ return hitsForRowScanNoIndex;
+ }
+
+ public List<Pair<Integer, Integer>> getHitsForColumnScan() {
+ return hitsForColumnScan;
+ }
+ }
+
+ static class Stats {
+ String name;
+ long startTime;
+ long endTime;
+ long rowsRead;
+ long bytesRead;
+
+ public Stats(String name) {
+ this.name = name;
+ }
+
+ public void consume(Result r) {
+ consume(r, Integer.MAX_VALUE);
+ }
+
+ private void consume(Result r, int nBytesToConsume) {
+ Cell cell = r.getColumnLatestCell(CF, QN);
+ byte mix = 0;
+ byte[] valueArray = cell.getValueArray();
+ int n = Math.min(nBytesToConsume, cell.getValueLength());
+ for (int i = 0; i < n; i++) {
+ mix ^= valueArray[i];
+ bytesRead++;
+ }
+ discard(mix);
+ rowsRead++;
+ }
+
+ private void discard(byte n) {
+ // do nothing
+ }
+
+ public void markStart() {
+ System.out.println(name + " starts");
+ startTime = System.currentTimeMillis();
+ }
+
+ public void markEnd() {
+ endTime = System.currentTimeMillis();
+ System.out.println();
+ System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
new file mode 100644
index 0000000..adf4419
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
@@ -0,0 +1,237 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class HbaseStreamingInput {
+ private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
+
+ private static final int CELL_SIZE = 128 * 1024; // 128 KB
+ private static final byte[] CF = "F".getBytes();
+ private static final byte[] QN = "C".getBytes();
+
+ public static void createTable(String tableName) throws IOException {
+ HConnection conn = getConnection();
+ HBaseAdmin hadmin = new HBaseAdmin(conn);
+
+ try {
+ boolean tableExist = hadmin.tableExists(tableName);
+ if (tableExist) {
+ logger.info("HTable '" + tableName + "' already exists");
+ return;
+ }
+
+ logger.info("Creating HTable '" + tableName + "'");
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
+ desc.setMemStoreFlushSize(512 << 20);//512M
+
+ HColumnDescriptor fd = new HColumnDescriptor(CF);
+ fd.setBlocksize(CELL_SIZE);
+ desc.addFamily(fd);
+ hadmin.createTable(desc);
+
+ logger.info("HTable '" + tableName + "' created");
+ } finally {
+ conn.close();
+ hadmin.close();
+ }
+ }
+
+ private static void scheduleJob(Semaphore semaphore, int interval) {
+ while (true) {
+ semaphore.release();
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void addData(String tableName) throws IOException {
+
+ createTable(tableName);
+
+ final Semaphore semaphore = new Semaphore(0);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ scheduleJob(semaphore, 300000);//5 minutes a batch
+ }
+ }).start();
+
+ while (true) {
+ try {
+ semaphore.acquire();
+ int waiting = semaphore.availablePermits();
+ if (waiting > 0) {
+ logger.warn("There are another " + waiting + " batches waiting to be added");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ HConnection conn = getConnection();
+ HTableInterface table = conn.getTable(tableName);
+
+ byte[] key = new byte[8 + 4];//time + id
+
+ logger.info("============================================");
+ long startTime = System.currentTimeMillis();
+ logger.info("data load start time in millis: " + startTime);
+ logger.info("data load start at " + formatTime(startTime));
+ List<Put> buffer = Lists.newArrayList();
+ for (int i = 0; i < (1 << 10); ++i) {
+ long time = System.currentTimeMillis();
+ Bytes.putLong(key, 0, time);
+ Bytes.putInt(key, 8, i);
+ Put put = new Put(key);
+ byte[] cell = randomBytes(CELL_SIZE);
+ put.add(CF, QN, cell);
+ buffer.add(put);
+ }
+ table.put(buffer);
+ table.close();
+ conn.close();
+ long endTime = System.currentTimeMillis();
+ logger.info("data load end at " + formatTime(endTime));
+ logger.info("data load time consumed: " + (endTime - startTime));
+ logger.info("============================================");
+ }
+ }
+
+ public static void randomScan(String tableName) throws IOException {
+
+ final Semaphore semaphore = new Semaphore(0);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ scheduleJob(semaphore, 60000);//1 minutes a batch
+ }
+ }).start();
+
+ while (true) {
+ try {
+ semaphore.acquire();
+ int waiting = semaphore.drainPermits();
+ if (waiting > 0) {
+ logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Random r = new Random();
+ HConnection conn = getConnection();
+ HTableInterface table = conn.getTable(tableName);
+
+ long leftBound = getFirstKeyTime(table);
+ long rightBound = System.currentTimeMillis();
+
+ for (int t = 0; t < 5; ++t) {
+ long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
+ long end = start + 600000;//a period of 10 minutes
+ logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
+
+ Scan scan = new Scan();
+ scan.setStartRow(Bytes.toBytes(start));
+ scan.setStopRow(Bytes.toBytes(end));
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ long hash = 0;
+ int rowCount = 0;
+ for (Result result : scanner) {
+ Cell cell = result.getColumnLatestCell(CF, QN);
+ byte[] value = cell.getValueArray();
+ if (cell.getValueLength() != CELL_SIZE) {
+ logger.error("value size invalid!!!!!");
+ }
+
+ hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()));
+ rowCount++;
+ }
+ scanner.close();
+ logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
+ }
+ table.close();
+ conn.close();
+ }
+ }
+
+ private static long getFirstKeyTime(HTableInterface table) throws IOException {
+ long startTime = 0;
+
+ Scan scan = new Scan();
+ scan.addFamily(CF);
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell cell = result.getColumnLatestCell(CF, QN);
+ byte[] key = cell.getRowArray();
+ startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
+ logger.info("Retrieved first record time: " + formatTime(startTime));
+ break;//only get first one
+ }
+ scanner.close();
+ return startTime;
+
+ }
+
+ private static HConnection getConnection() throws IOException {
+ return HConnectionManager.createConnection(HBaseConfiguration.create());
+ }
+
+ private static String formatTime(long time) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(time);
+ return dateFormat.format(cal.getTime());
+ }
+
+ private static byte[] randomBytes(int lenth) {
+ byte[] bytes = new byte[lenth];
+ Random rand = new Random();
+ rand.nextBytes(bytes);
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args[0].equalsIgnoreCase("createtable")) {
+ createTable(args[1]);
+ } else if (args[0].equalsIgnoreCase("adddata")) {
+ addData(args[1]);
+ } else if (args[0].equalsIgnoreCase("randomscan")) {
+ randomScan(args[1]);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
new file mode 100644
index 0000000..239adcf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+@SuppressWarnings("static-access")
+public class HtableAlterMetadataCLI extends AbstractHadoopJob {
+
+ private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key");
+ private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value");
+
+ protected static final Logger logger = LoggerFactory.getLogger(HtableAlterMetadataCLI.class);
+
+ String tableName;
+ String metadataKey;
+ String metadataValue;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ try {
+ options.addOption(OPTION_HTABLE_NAME);
+ options.addOption(OPTION_METADATA_KEY);
+ options.addOption(OPTION_METADATA_VALUE);
+
+ parseOptions(options, args);
+ tableName = getOptionValue(OPTION_HTABLE_NAME);
+ metadataKey = getOptionValue(OPTION_METADATA_KEY);
+ metadataValue = getOptionValue(OPTION_METADATA_VALUE);
+
+ alter();
+
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ private void alter() throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+
+ hbaseAdmin.disableTable(table.getTableName());
+ table.setValue(metadataKey, metadataValue);
+ hbaseAdmin.modifyTable(table.getTableName(), table);
+ hbaseAdmin.enableTable(table.getTableName());
+ hbaseAdmin.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new HtableAlterMetadataCLI(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
new file mode 100644
index 0000000..f0618c9
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class OrphanHBaseCleanJob extends AbstractHadoopJob {
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true).withDescription("metadata store whitelist, separated with comma").create("whitelist");
+
+ protected static final Logger logger = LoggerFactory.getLogger(OrphanHBaseCleanJob.class);
+
+ boolean delete = false;
+ Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ logger.info("jobs args: " + Arrays.toString(args));
+ try {
+ options.addOption(OPTION_DELETE);
+ options.addOption(OPTION_WHITELIST);
+ parseOptions(options, args);
+
+ logger.info("options: '" + getOptionsAsString() + "'");
+ logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
+ delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+ String[] metastoreWhitelist = getOptionValue(OPTION_WHITELIST).split(",");
+
+ for (String ms : metastoreWhitelist) {
+ logger.info("metadata store in white list: " + ms);
+ metastoreWhitelistSet.add(ms);
+ }
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+
+ cleanUnusedHBaseTables(conf);
+
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ throw e;
+ }
+ }
+
+ private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
+
+ // get all kylin hbase tables
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
+ HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+ List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+ for (HTableDescriptor desc : tableDescriptors) {
+ String host = desc.getValue(IRealizationConstants.HTableTag);
+ if (!metastoreWhitelistSet.contains(host)) {
+ logger.info("HTable {} is recognized as orphan because its tag is {}", desc.getTableName(), host);
+ //collect orphans
+ allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+ } else {
+ logger.info("HTable {} belongs to {}", desc.getTableName(), host);
+ }
+ }
+
+ if (delete == true) {
+ // drop tables
+ for (String htableName : allTablesNeedToBeDropped) {
+ logger.info("Deleting HBase table " + htableName);
+ if (hbaseAdmin.tableExists(htableName)) {
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ }
+
+ hbaseAdmin.deleteTable(htableName);
+ logger.info("Deleted HBase table " + htableName);
+ } else {
+ logger.info("HBase table" + htableName + " does not exist");
+ }
+ }
+ } else {
+ System.out.println("--------------- Tables To Be Dropped ---------------");
+ for (String htableName : allTablesNeedToBeDropped) {
+ System.out.println(htableName);
+ }
+ System.out.println("----------------------------------------------------");
+ }
+
+ hbaseAdmin.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new OrphanHBaseCleanJob(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
new file mode 100644
index 0000000..9cd6d23
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -0,0 +1,71 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class RowCounterCLI {
+ private static final Logger logger = LoggerFactory.getLogger(RowCounterCLI.class);
+
+ public static void main(String[] args) throws IOException {
+
+ if (args == null || args.length != 3) {
+ System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]");
+ }
+
+ System.out.println(args[0]);
+ String htableName = args[0];
+ System.out.println(args[1]);
+ byte[] startKey = BytesUtil.fromReadableText(args[1]);
+ System.out.println(args[2]);
+ byte[] endKey = BytesUtil.fromReadableText(args[2]);
+
+ if (startKey == null) {
+ System.out.println("startkey is null ");
+ } else {
+ System.out.println("startkey lenght: " + startKey.length);
+ }
+
+ System.out.println("start key in binary: " + Bytes.toStringBinary(startKey));
+ System.out.println("end key in binary: " + Bytes.toStringBinary(endKey));
+
+ Configuration conf = HBaseConfiguration.create();
+
+ Scan scan = new Scan();
+ scan.setCaching(1024);
+ scan.setCacheBlocks(true);
+ scan.setStartRow(startKey);
+ scan.setStopRow(endKey);
+
+ logger.info("My Scan " + scan.toString());
+
+ HConnection conn = HConnectionManager.createConnection(conf);
+ HTableInterface tableInterface = conn.getTable(htableName);
+
+ Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
+ int counter = 0;
+ while (iterator.hasNext()) {
+ iterator.next();
+ counter++;
+ if (counter % 1000 == 1) {
+ System.out.println("number of rows: " + counter);
+ }
+ }
+ System.out.println("number of rows: " + counter);
+ tableInterface.close();
+ conn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
new file mode 100644
index 0000000..490c580
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageCleanupJob extends AbstractHadoopJob {
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+
+ protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
+
+ public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
+
+ boolean delete = false;
+
+ protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ logger.info("jobs args: " + Arrays.toString(args));
+ try {
+ options.addOption(OPTION_DELETE);
+ parseOptions(options, args);
+
+ logger.info("options: '" + getOptionsAsString() + "'");
+ logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
+ delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+
+ cleanUnusedIntermediateHiveTable(conf);
+ cleanUnusedHdfsFiles(conf);
+ cleanUnusedHBaseTables(conf);
+
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ // get all kylin hbase tables
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
+ HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+ List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+ for (HTableDescriptor desc : tableDescriptors) {
+ String host = desc.getValue(IRealizationConstants.HTableTag);
+ String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
+ if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+ //only take care htables that belongs to self, and created more than 2 days
+ if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
+ allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+ }
+ }
+ }
+
+ // remove every segment htable from drop list
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments()) {
+ String tablename = seg.getStorageLocationIdentifier();
+ if (allTablesNeedToBeDropped.contains(tablename)) {
+ allTablesNeedToBeDropped.remove(tablename);
+ logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+ }
+ }
+ }
+
+ // remove every ii segment htable from drop list
+ for (IIInstance ii : iiManager.listAllIIs()) {
+ for (IISegment seg : ii.getSegments()) {
+ String tablename = seg.getStorageLocationIdentifier();
+
+ if (allTablesNeedToBeDropped.contains(tablename)) {
+ allTablesNeedToBeDropped.remove(tablename);
+ logger.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+ }
+ }
+ }
+
+ if (delete == true) {
+ // drop tables
+ for (String htableName : allTablesNeedToBeDropped) {
+ logger.info("Deleting HBase table " + htableName);
+ if (hbaseAdmin.tableExists(htableName)) {
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ }
+
+ hbaseAdmin.deleteTable(htableName);
+ logger.info("Deleted HBase table " + htableName);
+ } else {
+ logger.info("HBase table" + htableName + " does not exist");
+ }
+ }
+ } else {
+ System.out.println("--------------- Tables To Be Dropped ---------------");
+ for (String htableName : allTablesNeedToBeDropped) {
+ System.out.println(htableName);
+ }
+ System.out.println("----------------------------------------------------");
+ }
+
+ hbaseAdmin.close();
+ }
+
+ private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
+ JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ FileSystem fs = FileSystem.get(conf);
+ List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
+ // GlobFilter filter = new
+ // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ // + "/kylin-.*");
+ FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
+ for (FileStatus status : fStatus) {
+ String path = status.getPath().getName();
+ // System.out.println(path);
+ if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
+ String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
+ allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
+ }
+ }
+
+ List<String> allJobs = executableManager.getAllJobIds();
+ for (String jobId : allJobs) {
+ // only remove FINISHED and DISCARDED job intermediate files
+ final ExecutableState state = executableManager.getOutput(jobId).getState();
+ if (!state.isFinalState()) {
+ String path = JobInstance.getJobWorkingDir(jobId, engineConfig.getHdfsWorkingDirectory());
+ allHdfsPathsNeedToBeDeleted.remove(path);
+ logger.info("Remove " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state);
+ }
+ }
+
+ // remove every segment working dir from deletion list
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments()) {
+ String jobUuid = seg.getLastBuildJobID();
+ if (jobUuid != null && jobUuid.equals("") == false) {
+ String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
+ allHdfsPathsNeedToBeDeleted.remove(path);
+ logger.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
+ }
+ }
+ }
+
+ if (delete == true) {
+ // remove files
+ for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+ logger.info("Deleting hdfs path " + hdfsPath);
+ Path p = new Path(hdfsPath);
+ if (fs.exists(p) == true) {
+ fs.delete(p, true);
+ logger.info("Deleted hdfs path " + hdfsPath);
+ } else {
+ logger.info("Hdfs path " + hdfsPath + "does not exist");
+ }
+ }
+ } else {
+ System.out.println("--------------- HDFS Path To Be Deleted ---------------");
+ for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+ System.out.println(hdfsPath);
+ }
+ System.out.println("-------------------------------------------------------");
+ }
+
+ }
+
+ private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
+ final KylinConfig config = KylinConfig.getInstanceFromEnv();
+ final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
+ final int uuidLength = 36;
+
+ final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+ StringBuilder buf = new StringBuilder();
+ buf.append("hive -e \"");
+ buf.append(useDatabaseHql);
+ buf.append("show tables " + "\'kylin_intermediate_*\'" + "; ");
+ buf.append("\"");
+
+ Pair<Integer, String> result = cmdExec.execute(buf.toString());
+
+ String outputStr = result.getSecond();
+ BufferedReader reader = new BufferedReader(new StringReader(outputStr));
+ String line = null;
+ List<String> allJobs = executableManager.getAllJobIds();
+ List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
+ List<String> workingJobList = new ArrayList<String>();
+
+ for (String jobId : allJobs) {
+ // only remove FINISHED and DISCARDED job intermediate table
+ final ExecutableState state = executableManager.getOutput(jobId).getState();
+
+ if (!state.isFinalState()) {
+ workingJobList.add(jobId);
+ logger.info("Remove intermediate hive table with job id " + jobId + " with job status " + state);
+ }
+ }
+
+ while ((line = reader.readLine()) != null) {
+ if (line.startsWith("kylin_intermediate_")) {
+ boolean isNeedDel = false;
+ String uuid = line.substring(line.length() - uuidLength, line.length());
+ uuid = uuid.replace("_", "-");
+ //Check whether it's a hive table in use
+ if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
+ isNeedDel = true;
+ }
+
+ if (isNeedDel) {
+ allHiveTablesNeedToBeDeleted.add(line);
+ }
+ }
+ }
+
+ if (delete == true) {
+ buf.delete(0, buf.length());
+ buf.append("hive -e \"");
+ buf.append(useDatabaseHql);
+ for (String delHive : allHiveTablesNeedToBeDeleted) {
+ buf.append("drop table if exists " + delHive + "; ");
+ logger.info("Remove " + delHive + " from hive tables.");
+ }
+ buf.append("\"");
+
+ try {
+ cmdExec.execute(buf.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
+ for (String hiveTable : allHiveTablesNeedToBeDeleted) {
+ System.out.println(hiveTable);
+ }
+ System.out.println("----------------------------------------------------");
+ }
+
+ if (reader != null)
+ reader.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
new file mode 100644
index 0000000..f0c4c5b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/TarGZUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+public class TarGZUtil {
+
+ public static void uncompressTarGZ(File tarFile, File dest) throws IOException {
+ dest.mkdir();
+ TarArchiveInputStream tarIn = null;
+
+ tarIn = new TarArchiveInputStream(new GzipCompressorInputStream(new BufferedInputStream(new FileInputStream(tarFile))));
+
+ TarArchiveEntry tarEntry = tarIn.getNextTarEntry();
+ // tarIn is a TarArchiveInputStream
+ while (tarEntry != null) {// create a file with the same name as the tarEntry
+ File destPath = new File(dest, tarEntry.getName());
+ System.out.println("working: " + destPath.getCanonicalPath());
+ if (tarEntry.isDirectory()) {
+ destPath.mkdirs();
+ } else {
+ destPath.createNewFile();
+ //byte [] btoRead = new byte[(int)tarEntry.getSize()];
+ byte[] btoRead = new byte[1024];
+ //FileInputStream fin
+ // = new FileInputStream(destPath.getCanonicalPath());
+ BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream(destPath));
+ int len = 0;
+
+ while ((len = tarIn.read(btoRead)) != -1) {
+ bout.write(btoRead, 0, len);
+ }
+
+ bout.close();
+ btoRead = null;
+
+ }
+ tarEntry = tarIn.getNextTarEntry();
+ }
+ tarIn.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
new file mode 100644
index 0000000..7b9831a
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
@@ -0,0 +1,69 @@
+package org.apache.kylin.job.monitor;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+ public static void main(String[] args) {
+ Preconditions.checkArgument(args[0].equals("monitor"));
+
+ int i = 1;
+ List<String> receivers = null;
+ String host = null;
+ String tableName = null;
+ String authorization = null;
+ String cubeName = null;
+ String projectName = "default";
+ while (i < args.length) {
+ String argName = args[i];
+ switch (argName) {
+ case "-receivers":
+ receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+ break;
+ case "-host":
+ host = args[++i];
+ break;
+ case "-tableName":
+ tableName = args[++i];
+ break;
+ case "-authorization":
+ authorization = args[++i];
+ break;
+ case "-cubeName":
+ cubeName = args[++i];
+ break;
+ case "-projectName":
+ projectName = args[++i];
+ break;
+ default:
+ throw new RuntimeException("invalid argName:" + argName);
+ }
+ i++;
+ }
+ Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+ final StreamingMonitor streamingMonitor = new StreamingMonitor();
+ if (tableName != null) {
+ logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+ Preconditions.checkNotNull(host);
+ Preconditions.checkNotNull(authorization);
+ Preconditions.checkNotNull(tableName);
+ streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
+ }
+ if (cubeName != null) {
+ logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+ streamingMonitor.checkCube(receivers, cubeName,host);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..e23f065
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.job.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+ public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
+ String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+ StringBuilder stringBuilder = new StringBuilder();
+ String url = host + "/kylin/api/query";
+ PostMethod request = new PostMethod(url);
+ try {
+
+ request.addRequestHeader("Authorization", "Basic " + authorization);
+ request.addRequestHeader("Content-Type", "application/json");
+ String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
+ request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+ int statusCode = new HttpClient().executeMethod(request);
+ String msg = Bytes.toString(request.getResponseBody());
+ stringBuilder.append("host:").append(host).append("\n");
+ stringBuilder.append("query:").append(query).append("\n");
+ stringBuilder.append("statusCode:").append(statusCode).append("\n");
+ if (statusCode == 200) {
+ title += "succeed";
+ final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+ stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+ stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+ } else {
+ title += "failed";
+ stringBuilder.append("response:").append(msg).append("\n");
+ }
+ } catch (Exception e) {
+ final StringWriter out = new StringWriter();
+ e.printStackTrace(new PrintWriter(out));
+ title += "failed";
+ stringBuilder.append(out.toString());
+ } finally {
+ request.releaseConnection();
+ }
+ logger.info("title:" + title);
+ logger.info("content:" + stringBuilder.toString());
+ sendMail(receivers, title, stringBuilder.toString());
+ }
+
+ public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+ List<CubeSegment> segments = getSortedReadySegments(cubeName);
+ List<Pair<Long, Long>> gaps = Lists.newArrayList();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+ continue;
+ } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+ gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+ }
+ }
+ return gaps;
+ }
+
+ private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+ final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ Preconditions.checkNotNull(cube);
+ final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+ logger.info("totally " + segments.size() + " cubeSegments");
+ Collections.sort(segments);
+ return segments;
+ }
+
+ public static final List<Pair<String, String>> findOverlaps(String cubeName) {
+ List<CubeSegment> segments = getSortedReadySegments(cubeName);
+ List<Pair<String, String>> overlaps = Lists.newArrayList();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+ continue;
+ } else {
+ overlaps.add(Pair.newPair(first.getName(), second.getName()));
+ }
+ }
+ return overlaps;
+ }
+
+ public void checkCube(List<String> receivers, String cubeName, String host) {
+ final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ if (cube == null) {
+ logger.info("cube:" + cubeName + " does not exist");
+ return;
+ }
+ List<Pair<Long, Long>> gaps = findGaps(cubeName);
+ List<Pair<String, String>> overlaps = Lists.newArrayList();
+ StringBuilder content = new StringBuilder();
+ if (!gaps.isEmpty()) {
+ content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+ @Nullable
+ @Override
+ public String apply(Pair<Long, Long> input) {
+ return parseInterval(input);
+ }
+ }), "\n")).append("\n");
+ }
+ if (!overlaps.isEmpty()) {
+ content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+ }
+ if (content.length() > 0) {
+ logger.info(content.toString());
+ sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
+ } else {
+ logger.info("no gaps or overlaps");
+ }
+ }
+
+ private String parseInterval(Pair<Long, Long> interval) {
+ return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+ }
+
+ private void sendMail(List<String> receivers, String title, String content) {
+ final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+ mailService.sendMail(receivers, title, content, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
new file mode 100644
index 0000000..029d4d2
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.job.streaming;
+
+/**
+ */
+public class BootstrapConfig {
+
+ private String streaming;
+ private int partitionId = -1;
+
+ //one off default value set to true
+ private boolean oneOff = true;
+ private long start = 0L;
+ private long end = 0L;
+ private long margin = 0L;
+
+
+ private boolean fillGap;
+
+ public long getMargin() {
+ return margin;
+ }
+
+ public void setMargin(long margin) {
+ this.margin = margin;
+ }
+
+ public boolean isOneOff() {
+ return oneOff;
+ }
+
+ public void setOneOff(boolean oneOff) {
+ this.oneOff = oneOff;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public String getStreaming() {
+ return streaming;
+ }
+
+ public void setStreaming(String streaming) {
+ this.streaming = streaming;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public boolean isFillGap() {
+ return fillGap;
+ }
+
+ public void setFillGap(boolean fillGap) {
+ this.fillGap = fillGap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
new file mode 100644
index 0000000..38787a8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.job.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.HBaseCuboidWriter;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CubeStreamConsumer implements MicroStreamBatchConsumer {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumer.class);
+
+ private final CubeManager cubeManager;
+ private final String cubeName;
+ private final KylinConfig kylinConfig;
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private int totalConsumedMessageCount = 0;
+ private int totalRawMessageCount = 0;
+
+ public CubeStreamConsumer(String cubeName) {
+ this.kylinConfig = KylinConfig.getInstanceFromEnv();
+ this.cubeManager = CubeManager.getInstance(kylinConfig);
+ this.cubeName = cubeName;
+ }
+
+ @Override
+ public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+
+ totalConsumedMessageCount += microStreamBatch.size();
+ totalRawMessageCount += microStreamBatch.getRawMessageCount();
+
+ final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+ long startOffset = microStreamBatch.getOffset().getFirst();
+ long endOffset = microStreamBatch.getOffset().getSecond();
+ LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+ blockingQueue.put(Collections.<String> emptyList());
+
+ final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getFirst(), microStreamBatch.getTimestamp().getSecond(), false, false);
+ long start = System.currentTimeMillis();
+ final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+ logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
+
+ final Configuration conf = HadoopUtil.getCurrentConfiguration();
+ final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+ FSDataInputStream localStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION));
+ ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), localStream, System.currentTimeMillis());
+ localStream.close();
+ FileSystem.getLocal(conf).delete(outputPath, true);
+
+ final Map<TblColRef, Dictionary<?>> dictionaryMap = CubingUtils.buildDictionary(cubeInstance, parsedStreamMessages);
+ Map<TblColRef, Dictionary<?>> realDictMap = CubingUtils.writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), realDictMap);
+ final HTableInterface hTable = createHTable(cubeSegment);
+ final HBaseCuboidWriter gtRecordWriter = new HBaseCuboidWriter(cubeDesc, hTable);
+
+ executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
+ gtRecordWriter.flush();
+ hTable.close();
+ commitSegment(cubeSegment);
+
+ logger.info("Consumed {} messages out of {} raw messages", totalConsumedMessageCount, totalRawMessageCount);
+ }
+
+ //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
+ private void commitSegment(CubeSegment cubeSegment) throws IOException {
+ cubeSegment.setStatus(SegmentStatusEnum.READY);
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
+ cubeBuilder.setToAddSegs(cubeSegment);
+ CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+ }
+
+ private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
+ final String hTableName = cubeSegment.getStorageLocationIdentifier();
+ CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
+ final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
+ logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
+ return hTable;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..95fbc9d
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.StreamingConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+ public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+
+ KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+ String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+ @Nullable
+ @Override
+ public String apply(BrokerConfig brokerConfig) {
+ return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }), ",");
+ Properties props = new Properties();
+ props.put("metadata.broker.list", brokerList);
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ Producer<String, String> producer = new Producer<String, String>(config);
+
+ List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
+ for (int i = 0; i < messages.size(); ++i) {
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+ keyedMessages.add(keyedMessage);
+ }
+ producer.send(keyedMessages);
+ producer.close();
+ }
+
+}