You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:47:02 UTC
[41/51] [partial] incubator-kylin git commit: migrate repo from
github.com to apache git
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java b/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
new file mode 100644
index 0000000..de63c27
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
@@ -0,0 +1,645 @@
+package com.kylinolap.cube.dataGen;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.Array;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.DimensionDesc;
+import com.kylinolap.metadata.model.cube.JoinDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.TblColRef;
+import com.kylinolap.metadata.model.schema.ColumnDesc;
+import com.kylinolap.metadata.model.schema.DataType;
+
+/**
+ * Created by hongbin on 5/20/14.
+ */
+public class FactTableGenerator {
+ CubeInstance cube = null;
+ CubeDesc desc = null;
+ ResourceStore store = null;
+ String factTableName = null;
+
+ GenConfig genConf = null;
+
+ Random r = null;
+
+ String cubeName;
+ long randomSeed;
+ int rowCount;
+ int unlinkableRowCount;
+ int unlinkableRowCountMax;
+ double conflictRatio;
+ double linkableRatio;
+
+ // the names of lookup table columns which is in relation with fact
+ // table(appear as fk in fact table)
+ Hashtable<String, LinkedList<String>> lookupTableKeys = new Hashtable<String, LinkedList<String>>();
+
+ // possible values of lookupTableKeys, extracted from existing lookup
+ // tables.
+ // The key is in the format of tablename/columnname
+ HashMap<String, ArrayList<String>> feasibleValues = new HashMap<String, ArrayList<String>>();
+
+ // lookup table name -> sets of all composite keys
+ HashMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new HashMap<String, HashSet<Array<String>>>();
+
+ private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
+ this.rowCount = rowCount;
+ this.conflictRatio = conflictRaio;
+ this.cubeName = cubeName;
+ this.randomSeed = randomSeed;
+ this.linkableRatio = linkableRatio;
+
+ this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
+ this.unlinkableRowCount = 0;
+
+ r = new Random(randomSeed);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ desc = cube.getDescriptor();
+ factTableName = desc.getFactTable();
+ store = ResourceStore.getStore(config);
+ }
+
+ /*
+ * users can specify the value preference for each column
+ */
+ private void loadConfig() {
+ try {
+ InputStream configStream = null;
+ configStream = store.getResource("/data/data_gen_config.json");
+ this.genConf = GenConfig.loadConfig(configStream);
+
+ if (configStream != null)
+ configStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ // only deal with composite keys
+ if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
+ lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
+ }
+
+ InputStream tableStream = null;
+ BufferedReader tableReader = null;
+ try {
+ Hashtable<String, Integer> zeroBasedInice = new Hashtable<String, Integer>();
+ for (String columnName : columnNames) {
+ ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
+ zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
+ }
+
+ String path = "/data/" + lookupTableName + ".csv";
+ tableStream = store.getResource(path);
+ tableReader = new BufferedReader(new InputStreamReader(tableStream));
+ tableReader.mark(0);
+ int rowCount = 0;
+ int curRowNum = 0;
+ String curRow;
+
+ while (tableReader.readLine() != null)
+ rowCount++;
+
+ HashSet<Integer> rows = new HashSet<Integer>();
+ distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
+ while (rows.size() < distinctRowCount) {
+ rows.add(r.nextInt(rowCount));
+ }
+
+ // reopen the stream
+ tableStream.close();
+ tableReader.close();
+ tableStream = null;
+ tableReader = null;
+
+ tableStream = store.getResource(path);
+ tableReader = new BufferedReader(new InputStreamReader(tableStream));
+
+ while ((curRow = tableReader.readLine()) != null) {
+ if (rows.contains(curRowNum)) {
+ String[] tokens = curRow.split(",");
+
+ String[] comboKeys = null;
+ int index = 0;
+ if (columnNames.size() > 1)
+ comboKeys = new String[columnNames.size()];
+
+ for (String columnName : columnNames) {
+ int zeroBasedIndex = zeroBasedInice.get(columnName);
+ if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
+ feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
+ feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
+
+ if (columnNames.size() > 1) {
+ comboKeys[index] = tokens[zeroBasedIndex];
+ index++;
+ }
+ }
+
+ if (columnNames.size() > 1) {
+ Array<String> wrap = new Array<String>(comboKeys);
+ if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
+ throw new Exception("The composite key already exist in the lookup table");
+ }
+ lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
+ }
+ }
+ curRowNum++;
+ }
+
+ if (tableStream != null)
+ tableStream.close();
+ if (tableReader != null)
+ tableReader.close();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ // prepare the candidate values for each joined column
+ private void prepare() throws Exception {
+ // load config
+ loadConfig();
+
+ HashSet<String> factTableColumns = new HashSet<String>();
+
+ for (DimensionDesc dim : desc.getDimensions()) {
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (col.getTable().equals(factTableName))
+ factTableColumns.add(col.getName());
+ }
+
+ JoinDesc join = dim.getJoin();
+ if (join != null) {
+ String lookupTable = dim.getTable();
+ for (String column : join.getPrimaryKey()) {
+ if (!lookupTableKeys.containsKey(lookupTable)) {
+ lookupTableKeys.put(lookupTable, new LinkedList<String>());
+ }
+
+ if (!lookupTableKeys.get(lookupTable).contains(column))
+ lookupTableKeys.get(lookupTable).add(column);
+ }
+ }
+ }
+
+ int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
+ distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
+ // lookup tables
+ for (String lookupTable : lookupTableKeys.keySet()) {
+ this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
+ }
+ }
+
+ private List<DimensionDesc> getSortedDimentsionDescs() {
+ List<DimensionDesc> dimensions = desc.getDimensions();
+ Collections.sort(dimensions, new Comparator<DimensionDesc>() {
+ @Override
+ public int compare(DimensionDesc o1, DimensionDesc o2) {
+ JoinDesc j1 = o2.getJoin();
+ JoinDesc j2 = o1.getJoin();
+ return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
+ }
+ });
+ return dimensions;
+ }
+
+ private void execute(String joinType) throws Exception {
+
+ // main logic here , generate the data to a temp file
+ String tempFilePath = generate();
+
+ // Write to hbase
+ File tempFile = new File(tempFilePath);
+
+ InputStream in = new FileInputStream(tempFile);
+ String factTablePath = "/data/" + factTableName + ".csv";
+ store.deleteResource(factTablePath);
+ store.putResource(factTablePath, in, System.currentTimeMillis());
+ in.close();
+
+ // duplicate a copy of this fact table, with a naming convention with
+ // jointype added
+ // so that later test cases can select different data files
+ in = new FileInputStream(tempFile);
+ String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
+ store.deleteResource(factTablePathWithJoinType);
+ store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
+ in.close();
+
+ tempFile.delete();
+
+ System.out.println();
+ System.out.println("The new fact table has been written to $KYLIN_METADATA_URL" + factTablePath);
+ System.out.println();
+ }
+
+ /**
+ * Generate the fact table and put it into a temp file
+ *
+ * @return
+ * @throws Exception
+ */
+ private String generate() throws Exception {
+ // the columns on the fact table can be classified into three groups:
+ // 1. foreign keys
+ HashMap<String, String> factTableCol2LookupCol = new HashMap<String, String>();
+ // 2. metrics or directly used dimensions
+ HashSet<String> usedCols = new HashSet<String>();
+ // 3. others, not referenced anywhere
+
+ HashMap<String, String> lookupCol2factTableCol = new HashMap<String, String>();
+
+ // find fact table columns in fks
+ List<DimensionDesc> dimensions = getSortedDimentsionDescs();
+ for (DimensionDesc dim : dimensions) {
+ JoinDesc jDesc = dim.getJoin();
+ if (jDesc != null) {
+ String[] fks = jDesc.getForeignKey();
+ String[] pks = jDesc.getPrimaryKey();
+ int num = fks.length;
+ for (int i = 0; i < num; ++i) {
+ String value = dim.getTable() + "/" + pks[i];
+
+ lookupCol2factTableCol.put(value, fks[i]);
+
+ if (factTableCol2LookupCol.containsKey(fks[i])) {
+ if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
+ System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
+ continue;
+ }
+ }
+ factTableCol2LookupCol.put(fks[i], value);
+ }
+ }
+ //else, deal with it in next roung
+ }
+
+ // find fact table columns in direct dimension
+ // DO NOT merge this with the previous loop
+ for (DimensionDesc dim : dimensions) {
+ JoinDesc jDesc = dim.getJoin();
+ if (jDesc == null) {
+ // column on fact table used directly as a dimension
+ if (!factTableCol2LookupCol.containsKey(dim.getColumn()))
+ usedCols.add(dim.getColumn());
+ }
+ }
+
+ // find fact table columns in measures
+ for (MeasureDesc mDesc : desc.getMeasures()) {
+ List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
+ if (pcols != null) {
+ for (TblColRef col : pcols) {
+ if (!factTableCol2LookupCol.containsKey(col.getName()))
+ usedCols.add(col.getName());
+ }
+ }
+ }
+
+ return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
+ }
+
+ private String normToTwoDigits(int v) {
+ if (v < 10)
+ return "0" + v;
+ else
+ return Integer.toString(v);
+ }
+
+ private String randomPick(ArrayList<String> candidates) {
+ int index = r.nextInt(candidates.size());
+ return candidates.get(index);
+ }
+
+ private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
+ DataType type = cDesc.getType();
+ if (type.isStringFamily()) {
+ throw new Exception("Can't handle range values for string");
+
+ } else if (type.isIntegerFamily()) {
+ int low = Integer.parseInt(range.get(0));
+ int high = Integer.parseInt(range.get(1));
+ return Integer.toString(r.nextInt(high - low) + low);
+
+ } else if (type.isDouble()) {
+ double low = Double.parseDouble(range.get(0));
+ double high = Double.parseDouble(range.get(1));
+ return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+ } else if (type.isFloat()) {
+ float low = Float.parseFloat(range.get(0));
+ float high = Float.parseFloat(range.get(1));
+ return String.format("%.4f", r.nextFloat() * (high - low) + low);
+
+ } else if (type.isDecimal()) {
+ double low = Double.parseDouble(range.get(0));
+ double high = Double.parseDouble(range.get(1));
+ return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+ } else if (type.isDateTimeFamily()) {
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ Date start = format.parse(range.get(0));
+ Date end = format.parse(range.get(1));
+ long diff = end.getTime() - start.getTime();
+ Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(temp);
+ // first day
+ cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+ return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private String createRandomCell(ColumnDesc cDesc) {
+ String type = cDesc.getTypeName();
+ String s = type.toLowerCase();
+ if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 2; i++) {
+ sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
+ // possible strings
+ }
+ return sb.toString();
+ } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+ return Integer.toString(r.nextInt(128));
+ } else if (s.equals("double")) {
+ return String.format("%.4f", r.nextDouble() * 100);
+ } else if (s.equals("float")) {
+ return String.format("%.4f", r.nextFloat() * 100);
+ } else if (s.equals("decimal")) {
+ return String.format("%.4f", r.nextDouble() * 100);
+ } else if (s.equals("date")) {
+ long date20131231 = 61349312153265L;
+ long date20010101 = 60939158400000L;
+ long diff = date20131231 - date20010101;
+ Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(temp);
+ // first day
+ cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+ return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private String createDefaultsCell(String type) {
+ String s = type.toLowerCase();
+ if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+ return "abcde";
+ } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+ return "0";
+ } else if (s.equals("double")) {
+ return "0";
+ } else if (s.equals("float")) {
+ return "0";
+ } else if (s.equals("decimal")) {
+ return "0";
+ } else if (s.equals("date")) {
+ return "1970-01-01";
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private void printColumnMappings(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) {
+
+ System.out.println("=======================================================================");
+ System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
+ System.out.println();
+ System.out.println();
+ for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
+ System.out.format("%-30s %s", entry.getKey(), entry.getValue());
+ System.out.println();
+ }
+ for (String key : usedCols) {
+ System.out.format("%-30s %s", key, "Random Values");
+ System.out.println();
+ }
+ for (String key : defaultColumns) {
+ System.out.format("%-30s %s", key, "Default Values");
+ System.out.println();
+ }
+ System.out.println("=======================================================================");
+
+ System.out.println("Parameters:");
+ System.out.println();
+ System.out.println("CubeName: " + cubeName);
+ System.out.println("RowCount: " + rowCount);
+ System.out.println("ConflictRatio: " + conflictRatio);
+ System.out.println("LinkableRatio: " + linkableRatio);
+ System.out.println("Seed: " + randomSeed);
+ System.out.println();
+ System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
+ System.out.println("You can vary the above parameters to generate different datasets.");
+ System.out.println();
+ }
+
+ // Any row in the column must finally appear in the flatten big table.
+ // for single-column joins the generated row is guaranteed to have a match
+ // in lookup table
+ // for composite keys we'll need an extra check
+ private boolean matchAllCompositeKeys(HashMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ for (String lookupTable : lookupTableKeys.keySet()) {
+ if (lookupTableKeys.get(lookupTable).size() == 1)
+ continue;
+
+ String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
+ int index = 0;
+ for (String column : lookupTableKeys.get(lookupTable)) {
+ String key = lookupTable + "/" + column;
+ String factTableCol = lookupCol2FactTableCol.get(key);
+ int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
+ comboKey[index] = columnValues.get(cardinal);
+
+ index++;
+ }
+ Array<String> wrap = new Array<String>(comboKey);
+ if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
+ // System.out.println("Try " + wrap + " Failed, continue...");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String createCell(ColumnDesc cDesc) throws Exception {
+ ColumnConfig cConfig = null;
+
+ if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
+ // if the column is not configured, use random values
+ return (createRandomCell(cDesc));
+
+ } else {
+ // the column has a configuration
+ if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
+ // if the column still allows random values
+ return (createRandomCell(cDesc));
+
+ } else {
+ // use specified values
+ ArrayList<String> valueSet = cConfig.getValueSet();
+ if (valueSet == null || valueSet.size() == 0)
+ throw new Exception("Did you forget to specify value set for " + cDesc.getName());
+
+ if (!cConfig.isAsRange()) {
+ return (randomPick(valueSet));
+ } else {
+ if (valueSet.size() != 2)
+ throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
+
+ return (createRandomCell(cDesc, valueSet));
+ }
+ }
+
+ }
+ }
+
+ private LinkedList<String> createRow(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ LinkedList<String> columnValues = new LinkedList<String>();
+
+ for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
+
+
+ String colName = cDesc.getName();
+
+ if (factTableCol2LookupCol.containsKey(colName)) {
+
+ // if the current column is a fk column in fact table
+ ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
+
+ columnValues.add(candidates.get(r.nextInt(candidates.size())));
+ } else if (usedCols.contains(colName)) {
+
+ // if the current column is a metric column in fact table
+ columnValues.add(createCell(cDesc));
+ } else {
+
+ // otherwise this column is not useful in OLAP
+ columnValues.add(createDefaultsCell(cDesc.getTypeName()));
+ defaultColumns.add(colName);
+ }
+ }
+
+ return columnValues;
+ }
+
+ private String createTable(int rowCount, HashMap<String, String> factTableCol2LookupCol, HashMap<String, String> lookupCol2FactTableCol, HashSet<String> usedCols) throws Exception {
+ try {
+ File tempFile = File.createTempFile("ftg", ".tmp");
+ BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
+ HashSet<String> defaultColumns = new HashSet<String>();
+
+ for (int i = 0; i < rowCount; ) {
+
+ LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
+
+ if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
+ if (unlinkableRowCount < unlinkableRowCountMax) {
+ unlinkableRowCount++;
+ } else {
+ continue;
+ }
+ }
+
+ StringBuffer sb = new StringBuffer();
+ for (String c : columnValues)
+ sb.append(c + ",");
+ sb.deleteCharAt(sb.length() - 1);
+ writer.write(sb.toString());
+ writer.newLine();
+ i++;
+
+ // System.out.println("Just generated the " + i + "th record");
+ }
+ writer.flush();
+ writer.close();
+
+ printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
+
+ return tempFile.getAbsolutePath();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ return null;
+ }
+
+ /**
+ * Randomly create a fact table and put it to test_kylin_data table in hbase
+ *
+ * @param cubeName name of the cube
+ * @param rowCount expected row count generated
+ * @param linkableRatio the percentage of fact table rows that can be linked with all
+ * lookup table by INNER join
+ * @param randomSeed random seed
+ */
+ public static void generate(String cubeName, String rowCount, String linkableRatio, String randomSeed, String joinType) throws Exception {
+
+ if (cubeName == null)
+ cubeName = "test_kylin_cube_with_slr_ready";
+ if (rowCount == null)
+ rowCount = "10000";
+ if (linkableRatio == null)
+ linkableRatio = "0.6";
+
+ //if (randomSeed == null)
+ // don't give it value
+
+ // String conflictRatio = "5";//this parameter do not allow configuring
+ // any more
+
+ FactTableGenerator generator = new FactTableGenerator();
+ long seed;
+ if (randomSeed != null) {
+ seed = Long.parseLong(randomSeed);
+ } else {
+ Random r = new Random();
+ seed = r.nextLong();
+ }
+
+ generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
+ generator.prepare();
+ generator.execute(joinType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java b/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
new file mode 100644
index 0000000..a877c63
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
@@ -0,0 +1,63 @@
+package com.kylinolap.cube.dataGen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.kylinolap.common.util.JsonUtil;
+
+/**
+ * Created by honma on 5/29/14.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class GenConfig {
+
+ @JsonProperty("columnConfigs")
+ private ArrayList<ColumnConfig> columnConfigs;
+
+ private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
+
+ public ArrayList<ColumnConfig> getColumnConfigs() {
+ return columnConfigs;
+ }
+
+ public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
+ this.columnConfigs = columnConfigs;
+ }
+
+ public ColumnConfig getColumnConfigByName(String columnName) {
+ columnName = columnName.toLowerCase();
+
+ if (cache.containsKey(columnName))
+ return cache.get(columnName);
+
+ for (ColumnConfig cConfig : columnConfigs) {
+ if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
+ cache.put(columnName, cConfig);
+ return cConfig;
+ }
+ }
+ cache.put(columnName, null);
+ return null;
+ }
+
+ public static GenConfig loadConfig(InputStream stream) {
+ try {
+ GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
+ return config;
+ } catch (JsonMappingException e) {
+ e.printStackTrace();
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java b/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
new file mode 100644
index 0000000..494f1b0
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
@@ -0,0 +1,163 @@
+package com.kylinolap.cube.estimation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.cube.cuboid.CuboidScheduler;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.DimensionDesc;
+import com.kylinolap.metadata.model.cube.HierarchyDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.RowKeyColDesc;
+import com.kylinolap.metadata.model.cube.RowKeyDesc;
+import com.kylinolap.metadata.model.schema.DataType;
+
+/**
+ * Created by honma on 9/1/14.
+ */
+public class CubeSizeEstimationCLI {
+
+ public static class RowKeyColInfo {
+ public List<List<Integer>> hierachyColBitIndice;
+ public List<Integer> nonHierachyColBitIndice;
+ }
+
+ public static long estimatedCubeSize(String cubeName, long[] cardinality) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+
+ CuboidScheduler scheduler = new CuboidScheduler(cubeDesc);
+ long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+ LinkedList<Long> cuboidQueue = new LinkedList<Long>();
+ cuboidQueue.push(baseCuboid);
+
+ long totalSpace = 0;
+
+ while (!cuboidQueue.isEmpty()) {
+ long cuboidID = cuboidQueue.pop();
+ Collection<Long> spanningCuboid = scheduler.getSpanningCuboid(cuboidID);
+ for (Long sc : spanningCuboid) {
+ cuboidQueue.push(sc);
+ }
+
+ totalSpace += estimateCuboidSpace(cuboidID, cardinality, cubeDesc);
+ }
+ return totalSpace;
+ }
+
+ public static long estimateCuboidSpace(long cuboidID, long[] cardinality, CubeDesc cubeDesc) {
+
+ RowKeyColInfo rowKeyColInfo = extractRowKeyInfo(cubeDesc);
+ RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
+
+ long rowCount = 1;
+ int[] rowKeySpaces = estimateRowKeyColSpace(rowKeyDesc, cardinality);
+ int dimensionSpace = 0;
+ int measureSpace = getMeasureSpace(cubeDesc);
+
+ for (List<Integer> hlist : rowKeyColInfo.hierachyColBitIndice) {
+ // for hierachy columns, the cardinality of the most detailed column
+ // nominates.
+ int i;
+ for (i = 0; i < hlist.size() && rowKeyColExists(hlist.get(i), cuboidID); ++i) {
+ dimensionSpace += rowKeySpaces[hlist.get(i)];
+ }
+
+ if (i != 0)
+ rowCount *= cardinality[hlist.get(i - 1)];
+ }
+
+ for (int index : rowKeyColInfo.nonHierachyColBitIndice) {
+ if (rowKeyColExists(index, cuboidID)) {
+ rowCount *= cardinality[index];
+ dimensionSpace += rowKeySpaces[index];
+ }
+ }
+ return rowCount * (dimensionSpace + measureSpace);
+ }
+
+ private static int[] estimateRowKeyColSpace(RowKeyDesc rowKeyDesc, long[] cardinality) {
+ RowKeyColDesc[] rowKeyColDescs = rowKeyDesc.getRowKeyColumns();
+ int[] ret = new int[rowKeyColDescs.length];
+ for (int i = 0; i < rowKeyColDescs.length; ++i) {
+ RowKeyColDesc rowKeyColDesc = rowKeyColDescs[rowKeyColDescs.length - 1 - i];
+ if (rowKeyColDesc.getDictionary() == null) {
+ if (rowKeyColDesc.getLength() == 0)
+ throw new IllegalStateException("The non-dictionary col " + rowKeyColDesc.getColumn() + " has length of 0");
+ ret[i] = rowKeyColDesc.getLength();
+ } else {
+ ret[i] = estimateDictionaryColSpace(cardinality[i]);
+ }
+ }
+ return ret;
+ }
+
+ // TODO what if it's date dictionary?
+ private static int estimateDictionaryColSpace(long cardinality) {
+ long mask = 1L;
+ int i;
+ for (i = Long.SIZE - 1; i >= 0; i--) {
+ if ((cardinality & (mask << i)) != 0) {
+ break;
+ }
+ }
+
+ if (i < 0)
+ throw new IllegalStateException("the cardinality is 0");
+
+ return ((i + 1) + 7) / 8;// the bytes required to save at most
+ // cardinality numbers
+ }
+
+ private static int getMeasureSpace(CubeDesc cubeDesc) {
+ int space = 0;
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ DataType returnType = measureDesc.getFunction().getReturnDataType();
+ space += returnType.getSpaceEstimate();
+ }
+ return space;
+ }
+
+ private static boolean rowKeyColExists(int bitIndex, long cuboidID) {
+ long mask = 1L << bitIndex;
+ return (cuboidID & mask) != 0;
+ }
+
+ private static RowKeyColInfo extractRowKeyInfo(CubeDesc cubeDesc) {
+ RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
+ RowKeyColInfo info = new RowKeyColInfo();
+ info.hierachyColBitIndice = new ArrayList<List<Integer>>();
+ info.nonHierachyColBitIndice = new ArrayList<Integer>();
+ HashSet<Integer> heirachyIndexSet = new HashSet<Integer>();
+
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ if (dim.getHierarchy() != null) {
+ LinkedList<Integer> hlist = new LinkedList<Integer>();
+ for (HierarchyDesc hierarchyDesc : dim.getHierarchy()) {
+ int index = rowKeyDesc.getColumnBitIndex(hierarchyDesc.getColumnRef());
+ hlist.add(index);
+ heirachyIndexSet.add(index);
+ }
+ info.hierachyColBitIndice.add(hlist);
+ }
+ }
+
+ for (int i = 0; i < rowKeyDesc.getRowKeyColumns().length; ++i) {
+ if (!heirachyIndexSet.contains(i)) {
+ info.nonHierachyColBitIndice.add(i);
+ }
+ }
+
+ return info;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java b/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
new file mode 100644
index 0000000..a0ab338
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.exception;
+
+/**
+ * @author xduo
+ *
+ */
+public class CubeIntegrityException extends Exception {
+ /**
+ * @param string
+ */
+ public CubeIntegrityException(String string) {
+ super(string);
+ }
+
+ private static final long serialVersionUID = -7924187859607404390L;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
new file mode 100644
index 0000000..f1af35e
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.dict.Dictionary;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BitMapContainer implements ColumnValueContainer {
+
+ int valueLen;
+ int nValues;
+ int size;
+ ConciseSet[] sets;
+ boolean closedForChange;
+
+ transient byte[] temp;
+
+ public BitMapContainer(TableRecordInfoDigest info, int col) {
+ this.valueLen = info.length(col);
+ this.size = 0;
+ this.nValues = info.getMaxID(col) + 1;
+ this.sets = null;
+ this.closedForChange = false;
+
+ this.temp = new byte[valueLen];
+ }
+
+ @Override
+ public void append(ImmutableBytesWritable valueBytes) {
+ int value = BytesUtil.readUnsigned(valueBytes.get(), valueBytes.getOffset(), valueLen);
+ append(value);
+ }
+
+ public void append(int value) {
+ checkUpdateMode();
+ if (value == Dictionary.NULL_ID[valueLen]) {
+ value = nValues; // set[nValues] holds NULL
+ }
+ sets[value].add(size);
+ size++;
+ }
+
+ @Override
+ public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
+ int value = getValueIntAt(i);
+ BytesUtil.writeUnsigned(value, temp, 0, valueLen);
+ valueBytes.set(temp, 0, valueLen);
+ }
+
+ public int getValueIntAt(int i) {
+ for (int v = 0; v < nValues; v++) {
+ if (sets[v].contains(i)) {
+ return v;
+ }
+ }
+ // if v is not in [0..nValues-1], then it must be nValue (NULL)
+ return Dictionary.NULL_ID[valueLen];
+ }
+
+ private void checkUpdateMode() {
+ if (isClosedForChange()) {
+ throw new IllegalStateException();
+ }
+ if (sets == null) {
+ sets = new ConciseSet[nValues + 1];
+ for (int i = 0; i <= nValues; i++) {
+ sets[i] = new ConciseSet();
+ }
+ }
+ }
+
+ private boolean isClosedForChange() {
+ return closedForChange;
+ }
+
+ @Override
+ public void closeForChange() {
+ closedForChange = true;
+ }
+
+ @Override
+ public int getSize() {
+ return size;
+ }
+
+ public List<ImmutableBytesWritable> toBytes() {
+ if (isClosedForChange() == false)
+ closeForChange();
+
+ List<ImmutableBytesWritable> r = new ArrayList<ImmutableBytesWritable>(nValues + 1);
+ for (int i = 0; i <= nValues; i++) {
+ r.add(setToBytes(sets[i]));
+ }
+ return r;
+ }
+
+ public void fromBytes(List<ImmutableBytesWritable> bytes) {
+ assert nValues + 1 == bytes.size();
+ sets = new ConciseSet[nValues + 1];
+ size = 0;
+ for (int i = 0; i <= nValues; i++) {
+ sets[i] = bytesToSet(bytes.get(i));
+ size += sets[i].size();
+ }
+ closedForChange = true;
+ }
+
+ private ImmutableBytesWritable setToBytes(ConciseSet set) {
+ byte[] array;
+ if (set.isEmpty()) // ConciseSet.toByteBuffer() throws exception when set is empty
+ array = BytesUtil.EMPTY_BYTE_ARRAY;
+ else
+ array = set.toByteBuffer().array();
+ return new ImmutableBytesWritable(array);
+ }
+
+ private ConciseSet bytesToSet(ImmutableBytesWritable bytes) {
+ if (bytes.get() == null || bytes.getLength() == 0) {
+ return new ConciseSet();
+ } else {
+ IntBuffer intBuffer = ByteBuffer.wrap(bytes.get(), bytes.getOffset(), bytes.getLength()).asIntBuffer();
+ int[] words = new int[intBuffer.capacity()];
+ intBuffer.get(words);
+ return new ConciseSet(words, false);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (closedForChange ? 1231 : 1237);
+ result = prime * result + nValues;
+ result = prime * result + Arrays.hashCode(sets);
+ result = prime * result + size;
+ result = prime * result + valueLen;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ BitMapContainer other = (BitMapContainer) obj;
+ if (closedForChange != other.closedForChange)
+ return false;
+ if (nValues != other.nValues)
+ return false;
+ if (!Arrays.equals(sets, other.sets))
+ return false;
+ if (size != other.size)
+ return false;
+ if (valueLen != other.valueLen)
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
new file mode 100644
index 0000000..a2ac4c3
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public interface ColumnValueContainer {
+
+ void append(ImmutableBytesWritable valueBytes);
+
+ void closeForChange();
+
+ int getSize();
+
+ // works only after closeForChange()
+ void getValueAt(int i, ImmutableBytesWritable valueBytes);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
new file mode 100644
index 0000000..6874e8f
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.ning.compress.lzf.LZFDecoder;
+import com.ning.compress.lzf.LZFEncoder;
+
+/**
+ * @author yangli9
+ *
+ */
+public class CompressedValueContainer implements ColumnValueContainer {
+ int valueLen;
+ int cap;
+ int size;
+ byte[] uncompressed;
+ byte[] compressed;
+
+ public CompressedValueContainer(TableRecordInfoDigest info, int col, int cap) {
+ this.valueLen = info.length(col);
+ this.cap = cap;
+ this.size = 0;
+ this.uncompressed = null;
+ this.compressed = null;
+ }
+
+ @Override
+ public void append(ImmutableBytesWritable valueBytes) {
+ checkUpdateMode();
+ System.arraycopy(valueBytes.get(), valueBytes.getOffset(), uncompressed, valueLen * size, valueLen);
+ size++;
+ }
+
+ @Override
+ public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
+ valueBytes.set(uncompressed, valueLen * i, valueLen);
+ }
+
+ private void checkUpdateMode() {
+ if (isClosedForChange()) {
+ throw new IllegalArgumentException();
+ }
+ if (uncompressed == null) {
+ uncompressed = new byte[valueLen * cap];
+ }
+ }
+
+ private boolean isClosedForChange() {
+ return compressed != null;
+ }
+
+ @Override
+ public void closeForChange() {
+ checkUpdateMode();
+ try {
+ compressed = LZFEncoder.encode(uncompressed, 0, valueLen * size);
+ } catch (Exception e) {
+ throw new RuntimeException("LZF encode failure", e);
+ }
+ }
+
+ @Override
+ public int getSize() {
+ return size;
+ }
+
+ public ImmutableBytesWritable toBytes() {
+ if (isClosedForChange() == false)
+ closeForChange();
+ return new ImmutableBytesWritable(compressed);
+ }
+
+ public void fromBytes(ImmutableBytesWritable bytes) {
+ try {
+ uncompressed = LZFDecoder.decode(bytes.get(), bytes.getOffset(), bytes.getLength());
+ } catch (IOException e) {
+ throw new RuntimeException("LZF decode failure", e);
+ }
+ size = cap = uncompressed.length / valueLen;
+ compressed = BytesUtil.EMPTY_BYTE_ARRAY; // mark closed
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + size;
+ result = prime * result + valueLen;
+ result = prime * result + Arrays.hashCode(uncompressed);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CompressedValueContainer other = (CompressedValueContainer) obj;
+ if (size != other.size)
+ return false;
+ if (valueLen != other.valueLen)
+ return false;
+ if (!Bytes.equals(uncompressed, 0, size * valueLen, uncompressed, 0, size * valueLen))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
new file mode 100644
index 0000000..7e77136
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.kylinolap.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ */
+public class IIKeyValueCodec {
+
+ public static final int SHARD_LEN = 2;
+ public static final int TIMEPART_LEN = 8;
+ public static final int COLNO_LEN = 2;
+
+ private TableRecordInfoDigest infoDigest;
+
+ public IIKeyValueCodec(TableRecordInfoDigest info) {
+ this.infoDigest = info;
+ }
+
+ public Collection<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKeyValue(Slice slice) {
+ ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result = Lists.newArrayList();
+ ColumnValueContainer[] containers = slice.containers;
+ for (int col = 0; col < containers.length; col++) {
+ if (containers[col] instanceof BitMapContainer) {
+ collectKeyValues(slice, col, (BitMapContainer) containers[col], result);
+ } else if (containers[col] instanceof CompressedValueContainer) {
+ collectKeyValues(slice, col, (CompressedValueContainer) containers[col], result);
+ } else {
+ throw new IllegalArgumentException("Unkown container class " + containers[col].getClass());
+ }
+ }
+ return result;
+ }
+
+ private void collectKeyValues(Slice slice, int col, CompressedValueContainer container, //
+ ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+ ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, -1);
+ ImmutableBytesWritable value = container.toBytes();
+ result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value));
+ }
+
+ private void collectKeyValues(Slice slice, int col, BitMapContainer container, //
+ ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+ List<ImmutableBytesWritable> values = container.toBytes();
+ for (int v = 0; v < values.size(); v++) {
+ ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, v);
+ result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, values.get(v)));
+ }
+ }
+
+ ImmutableBytesWritable encodeKey(short shard, long timestamp, int col, int colValue) {
+ byte[] bytes = new byte[20];
+ int len = encodeKey(shard, timestamp, col, colValue, bytes, 0);
+ return new ImmutableBytesWritable(bytes, 0, len);
+ }
+
+ int encodeKey(short shard, long timestamp, int col, int colValue, byte[] buf, int offset) {
+ int i = offset;
+
+ BytesUtil.writeUnsigned(shard, buf, i, SHARD_LEN);
+ i += SHARD_LEN;
+ BytesUtil.writeLong(timestamp, buf, i, TIMEPART_LEN);
+ i += TIMEPART_LEN;
+
+ BytesUtil.writeUnsigned(col, buf, i, COLNO_LEN);
+ i += COLNO_LEN;
+
+ if (colValue >= 0) {
+ int colLen = infoDigest.length(col);
+ BytesUtil.writeUnsigned(colValue, buf, i, colLen);
+ i += colLen;
+ }
+
+ return i - offset;
+ }
+
+ public Iterable<Slice> decodeKeyValue(Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+ return new Decoder(infoDigest, kvs);
+ }
+
+ private static class Decoder implements Iterable<Slice> {
+
+ TableRecordInfoDigest info;
+ Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator;
+
+ Slice next = null;
+ short curShard = Short.MIN_VALUE;
+ long curSliceTimestamp = Long.MIN_VALUE;
+ int curCol = -1;
+ int curColValue = -1;
+ short lastShard = Short.MIN_VALUE;
+ long lastSliceTimestamp = Long.MIN_VALUE;
+ int lastCol = -1;
+ ColumnValueContainer[] containers = null;
+ List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
+
+ Decoder(TableRecordInfoDigest info, Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+ this.info = info;
+ this.iterator = kvs.iterator();
+ }
+
+ private void goToNext() {
+ if (next != null) { // was not fetched
+ return;
+ }
+
+ // NOTE the input keys are ordered
+ while (next == null && iterator.hasNext()) {
+ Pair<ImmutableBytesWritable, ImmutableBytesWritable> kv = iterator.next();
+ ImmutableBytesWritable k = kv.getFirst();
+ ImmutableBytesWritable v = kv.getSecond();
+ decodeKey(k);
+
+ if (curShard != lastShard || curSliceTimestamp != lastSliceTimestamp) {
+ makeNext();
+ }
+ consumeCurrent(v);
+ }
+ if (next == null) {
+ makeNext();
+ }
+ }
+
+ private void decodeKey(ImmutableBytesWritable k) {
+ byte[] buf = k.get();
+ int i = k.getOffset();
+
+ curShard = (short) BytesUtil.readUnsigned(buf, i, SHARD_LEN);
+ i += SHARD_LEN;
+ curSliceTimestamp = BytesUtil.readLong(buf, i, TIMEPART_LEN);
+ i += TIMEPART_LEN;
+
+ curCol = BytesUtil.readUnsigned(buf, i, COLNO_LEN);
+ i += COLNO_LEN;
+
+ if (i - k.getOffset() < k.getLength()) {
+ //bitmap
+ int colLen = info.length(curCol);
+ curColValue = BytesUtil.readUnsigned(buf, i, colLen);
+ i += colLen;
+ } else {
+ //value list
+ curColValue = -1;
+ }
+ }
+
+ private void consumeCurrent(ImmutableBytesWritable v) {
+ if (curCol != lastCol && bitMapValues.size() > 0) { // end of a bitmap container
+ addBitMapContainer(lastCol);
+ }
+ if (curColValue < 0) {
+ CompressedValueContainer c = new CompressedValueContainer(info, curCol, 0);
+ c.fromBytes(v);
+ addContainer(curCol, c);
+ } else {
+ assert curColValue == bitMapValues.size();
+ // make a copy, the value object from caller is typically reused through iteration
+ bitMapValues.add(new ImmutableBytesWritable(v));
+ }
+
+ lastShard = curShard;
+ lastSliceTimestamp = curSliceTimestamp;
+ lastCol = curCol;
+ }
+
+ private void makeNext() {
+ if (bitMapValues.isEmpty() == false) {
+ addBitMapContainer(lastCol);
+ }
+ if (containers != null) {
+ next = new Slice(info, lastShard, lastSliceTimestamp, containers);
+ }
+ lastSliceTimestamp = Long.MIN_VALUE;
+ lastCol = -1;
+ containers = null;
+ bitMapValues.clear();
+ }
+
+ private void addBitMapContainer(int col) {
+ BitMapContainer c = new BitMapContainer(info, col);
+ c.fromBytes(bitMapValues);
+ addContainer(col, c);
+ bitMapValues.clear();
+ }
+
+ private void addContainer(int col, ColumnValueContainer c) {
+ if (containers == null) {
+ containers = new ColumnValueContainer[info.getColumnCount()];
+ }
+ containers[col] = c;
+ }
+
+ @Override
+ public Iterator<Slice> iterator() {
+ return new Iterator<Slice>() {
+ @Override
+ public boolean hasNext() {
+ goToNext();
+ return next != null;
+ }
+
+ @Override
+ public Slice next() {
+ Slice result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
new file mode 100644
index 0000000..059be73
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+
+/**
+ * @author yangli9
+ *
+ */
+public class InvertedIndexCLI {
+
+ public static void main(String[] args) throws IOException {
+ Configuration hconf = HadoopUtil.getDefaultConfiguration();
+ CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ String cubeName = args[0];
+ CubeInstance cube = mgr.getCube(cubeName);
+
+ String path = args[1];
+ System.out.println("Reading from " + path + " ...");
+
+ TableRecordInfo info = new TableRecordInfo(cube.getFirstSegment());
+ IIKeyValueCodec codec = new IIKeyValueCodec(info);
+ int count = 0;
+ for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
+ for (TableRecordBytes rec : slice) {
+ System.out.println((TableRecord)rec);
+ count++;
+ }
+ }
+ System.out.println("Total " + count + " records");
+ }
+
+ public static Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> readSequenceKVs(Configuration hconf, String path) throws IOException {
+ final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+ return new Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+ @Override
+ public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
+ return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+ ImmutableBytesWritable k = new ImmutableBytesWritable();
+ ImmutableBytesWritable v = new ImmutableBytesWritable();
+ Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(k, v);
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = false;
+ try {
+ hasNext = reader.next(k, v);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (hasNext == false) {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+ return pair;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
new file mode 100644
index 0000000..3bd4603
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
@@ -0,0 +1,15 @@
+package com.kylinolap.cube.invertedindex;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public class ShardingHash {
+
+ static HashFunction hashFunc = Hashing.murmur3_128();
+
+ public static long hashInt(int integer) {
+ return hashFunc.newHasher().putInt(integer).hash().asLong();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
new file mode 100644
index 0000000..e9ffaf0
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
@@ -0,0 +1,35 @@
+package com.kylinolap.cube.invertedindex;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class ShardingSliceBuilder {
+
+ SliceBuilder[] builders;
+
+ public ShardingSliceBuilder(TableRecordInfo info) {
+ int sharding = info.getDescriptor().getSharding();
+ builders = new SliceBuilder[sharding];
+ for (short i = 0; i < sharding; i++) {
+ builders[i] = new SliceBuilder(info, i);
+ }
+ }
+
+ // NOTE: record must be appended in time order
+ public Slice append(TableRecord rec) {
+ short shard = rec.getShard();
+ return builders[shard].append(rec);
+ }
+
+ public List<Slice> close() {
+ List<Slice> result = Lists.newArrayList();
+ for (SliceBuilder builder : builders) {
+ Slice slice = builder.close();
+ if (slice != null)
+ result.add(slice);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
new file mode 100644
index 0000000..250ece4
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Within a partition (per timestampGranularity), records are further sliced
+ * (per sliceLength) to fit into HBASE cell.
+ *
+ * @author yangli9
+ */
+public class Slice implements Iterable<TableRecordBytes>, Comparable<Slice> {
+
+ TableRecordInfoDigest info;
+ int nColumns;
+
+ short shard;
+ long timestamp;
+ int nRecords;
+ ColumnValueContainer[] containers;
+
+ Slice(TableRecordInfoDigest info, short shard, long timestamp, ColumnValueContainer[] containers) {
+ this.info = info;
+ this.nColumns = info.getColumnCount();
+
+ this.shard = shard;
+ this.timestamp = timestamp;
+ this.nRecords = containers[0].getSize();
+ this.containers = containers;
+
+ assert nColumns == containers.length;
+ for (int i = 0; i < nColumns; i++) {
+ assert nRecords == containers[i].getSize();
+ }
+ }
+
+ public short getShard() {
+ return shard;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Standard iterator of Slice will return a iterator of TableRecordBytes,
+ * which cannot be printed/formated to readable text.
+ * By invoking this API client can avoid code like:
+ * <p/>
+ * for (TableRecordBytes rec : slice) {
+ * TableRecord realRecord = (TableRecord) rec.clone();
+ * }
+ * <p/>
+ * Note this API cannot be called simultaneously with iterator()
+ *
+ * @return
+ */
+ public Iterator<TableRecord> readableIterator() {
+ final Iterator<TableRecordBytes> innerIterator = iterator();
+
+ return new Iterator<TableRecord>() {
+
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public TableRecord next() {
+ return (TableRecord) innerIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public Iterator<TableRecordBytes> iterator() {
+ return new Iterator<TableRecordBytes>() {
+ int i = 0;
+ TableRecord rec = new TableRecord(info);
+ ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+ @Override
+ public boolean hasNext() {
+ return i < nRecords;
+ }
+
+ @Override
+ public TableRecordBytes next() {
+ for (int col = 0; col < nColumns; col++) {
+ containers[col].getValueAt(i, temp);
+ rec.setValueBytes(col, temp);
+ }
+ i++;
+ return rec;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((info == null) ? 0 : info.hashCode());
+ result = prime * result + shard;
+ result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Slice other = (Slice) obj;
+ if (info == null) {
+ if (other.info != null)
+ return false;
+ } else if (!info.equals(other.info))
+ return false;
+ if (shard != other.shard)
+ return false;
+ if (timestamp != other.timestamp)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(Slice o) {
+ int comp = this.shard - o.shard;
+ if (comp != 0)
+ return comp;
+
+ comp = (int) (this.timestamp - o.timestamp);
+ return comp;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
new file mode 100644
index 0000000..72523a8
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+
+/**
+ * @author yangli9
+ *
+ */
+public class SliceBuilder {
+
+ TableRecordInfo info;
+ private int nColumns;
+ int nRecordsCap;
+
+ short shard;
+ long sliceTimestamp;
+ int nRecords;
+ private ColumnValueContainer[] containers;
+
+ transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+ public SliceBuilder(TableRecordInfo info, short shard) {
+ this.info = info;
+ this.nColumns = info.getColumnCount();
+ this.nRecordsCap = Math.max(1, info.getDescriptor().getSliceSize());
+
+ this.shard = shard;
+ this.sliceTimestamp = Long.MIN_VALUE;
+ this.nRecords = 0;
+ this.containers = null;
+
+ doneSlice(); // init containers
+ }
+
+ private Slice doneSlice() {
+ Slice r = null;
+ if (nRecords > 0) {
+ for (int i = 0; i < nColumns; i++) {
+ containers[i].closeForChange();
+ }
+ r = new Slice(info, shard, sliceTimestamp, containers);
+ }
+
+ // reset for next slice
+ nRecords = 0;
+ containers = new ColumnValueContainer[nColumns];
+ for (int i : info.getDescriptor().getBitmapColumns()) {
+ containers[i] = new BitMapContainer(info, i);
+ }
+ for (int i : info.getDescriptor().getValueColumns()) {
+ containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
+ }
+ for (int i : info.getDescriptor().getMetricsColumns()) {
+ containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
+ }
+
+ return r;
+
+ }
+
+ // NOTE: record must be appended in time order
+ public Slice append(TableRecord rec) {
+ if (rec.getShard() != shard)
+ throw new IllegalStateException();
+
+ Slice doneSlice = null;
+
+ if (isFull()) {
+ doneSlice = doneSlice();
+ }
+
+ if (nRecords == 0) {
+ sliceTimestamp = increaseSliceTimestamp(rec.getTimestamp());
+ }
+
+ nRecords++;
+ for (int i = 0; i < nColumns; i++) {
+ rec.getValueBytes(i, temp);
+ containers[i].append(temp);
+ }
+
+ return doneSlice;
+ }
+
+ private long increaseSliceTimestamp(long timestamp) {
+ if (timestamp < sliceTimestamp)
+ throw new IllegalStateException();
+
+ if (timestamp == sliceTimestamp)
+ return ++timestamp; // ensure slice timestamp increases
+ else
+ return timestamp;
+ }
+
+ public Slice close() {
+ Slice doneSlice = doneSlice();
+ this.sliceTimestamp = Long.MIN_VALUE;
+ this.nRecords = 0;
+ return doneSlice;
+ }
+
+ private boolean isFull() {
+ return nRecords >= nRecordsCap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
new file mode 100644
index 0000000..53601e2
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import com.kylinolap.dict.DateStrDictionary;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ */
+public class TableRecord extends TableRecordBytes {
+
+
+ public TableRecord(TableRecordInfoDigest info) {
+ super();
+
+ if (info instanceof TableRecordInfo) {
+ } else {
+ throw new IllegalStateException("Table Record must be initialized with a TableRecordInfo");
+ }
+
+ this.info = info;
+ this.buf = new byte[info.byteFormLen];
+ reset();
+ }
+
+ public TableRecord(TableRecord another) {
+ super();
+
+ this.info = another.info;
+ this.buf = Bytes.copy(another.buf);
+ }
+
+ @Override
+ public Object clone() {
+ return new TableRecord(this);
+ }
+
+
+ public long getTimestamp() {
+ String str = getValueString(info().getTimestampColumn());
+ return DateStrDictionary.stringToMillis(str);
+ }
+
+ public int length(int col) {
+ return info.length(col);
+ }
+
+ public void setValueString(int col, String value) {
+ if (info().isMetrics(col)) {
+ LongWritable v = info().codec(col).valueOf(value);
+ setValueMetrics(col, v);
+ } else {
+ int id = info().dict(col).getIdFromValue(value);
+ setValueID(col, id);
+ }
+ }
+
+ public String getValueString(int col) {
+ if (info().isMetrics(col))
+ return info().codec(col).toString(getValueMetrics(col));
+ else
+ return info().dict(col).getValueFromId(getValueID(col));
+ }
+
+ public void setValueBytes(int col, ImmutableBytesWritable bytes) {
+ System.arraycopy(bytes.get(), bytes.getOffset(), buf, info.offset(col), info.length(col));
+ }
+
+ public void getValueBytes(int col, ImmutableBytesWritable bytes) {
+ bytes.set(buf, info.offset(col), info.length(col));
+ }
+
+ private void setValueMetrics(int col, LongWritable value) {
+ info().codec(col).write(value, buf, info.offset(col));
+ }
+
+ private LongWritable getValueMetrics(int col) {
+ return info().codec(col).read(buf, info.offset(col));
+ }
+
+ public short getShard() {
+ int timestampID = getValueID(info().getTimestampColumn());
+ return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info().getDescriptor().getSharding());
+ }
+
+ public TableRecordInfo info() {
+ return (TableRecordInfo) info;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("[");
+ for (int col = 0; col < info.getColumnCount(); col++) {
+ if (col > 0)
+ buf.append(",");
+ buf.append(getValueString(col));
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
new file mode 100644
index 0000000..a40362a
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
@@ -0,0 +1,84 @@
+package com.kylinolap.cube.invertedindex;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.dict.Dictionary;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+
+/**
+ * Created by honma on 11/10/14.
+ */
+public class TableRecordBytes implements Cloneable {
+ TableRecordInfoDigest info;
+ byte[] buf; // consecutive column value IDs (encoded by dictionary)
+
+ public TableRecordBytes() {
+ }
+
+ public void setValueID(int col, int id) {
+ BytesUtil.writeUnsigned(id, buf, info.offset(col), info.length(col));
+ }
+
+ public int getValueID(int col) {
+ return BytesUtil.readUnsigned(buf, info.offset(col), info.length(col));
+ }
+
+
+ public byte[] getBytes() {
+ return buf;
+ }
+
+ public void setBytes(byte[] bytes, int offset, int length) {
+ assert buf.length == length;
+ System.arraycopy(bytes, offset, buf, 0, length);
+ }
+
+ public void reset() {
+ Arrays.fill(buf, Dictionary.NULL);
+ }
+
+ public TableRecordBytes(TableRecordInfoDigest info) {
+ this.info = info;
+ this.buf = new byte[info.byteFormLen];
+ reset();
+ }
+
+ public TableRecordBytes(TableRecordBytes another) {
+ this.info = another.info;
+ this.buf = Bytes.copy(another.buf);
+ }
+
+ @Override
+ public Object clone() {
+ return new TableRecordBytes(this);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(buf);
+ result = prime * result + ((info == null) ? 0 : info.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TableRecord other = (TableRecord) obj;
+ if (!Arrays.equals(buf, other.buf))
+ return false;
+ if (info == null) {
+ if (other.info != null)
+ return false;
+ } else if (!info.equals(other.info))
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
new file mode 100644
index 0000000..6c6e02d
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
@@ -0,0 +1,8 @@
+package com.kylinolap.cube.invertedindex;
+
+/**
+ * Created by honma on 11/10/14.
+ */
+public interface TableRecordFactory {
+ public TableRecordBytes createTableRecord();
+}