You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/17 11:45:17 UTC
[8/9] incubator-kylin git commit: KYLIN-1010 Decompose project job
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
new file mode 100644
index 0000000..44ba8f4
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.util.ArrayList;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ColumnConfig {
+ @JsonProperty("columnName")
+ private String columnName;
+ @JsonProperty("valueSet")
+ private ArrayList<String> valueSet;
+ @JsonProperty("exclusive")
+ private boolean exclusive;
+ @JsonProperty("asRange")
+ private boolean asRange;
+
+ public boolean isAsRange() {
+ return asRange;
+ }
+
+ public void setAsRange(boolean asRange) {
+ this.asRange = asRange;
+ }
+
+ public boolean isExclusive() {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive) {
+ this.exclusive = exclusive;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public void setColumnName(String columnName) {
+ this.columnName = columnName;
+ }
+
+ public ArrayList<String> getValueSet() {
+ return valueSet;
+ }
+
+ public void setValueSet(ArrayList<String> valueSet) {
+ this.valueSet = valueSet;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
new file mode 100644
index 0000000..a965753
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class FactTableGenerator {
+ CubeInstance cube = null;
+ CubeDesc desc = null;
+ ResourceStore store = null;
+ String factTableName = null;
+
+ GenConfig genConf = null;
+
+ Random r = null;
+
+ String cubeName;
+ long randomSeed;
+ int rowCount;
+ int unlinkableRowCount;
+ int unlinkableRowCountMax;
+ double conflictRatio;
+ double linkableRatio;
+
+ // the names of lookup table columns which is in relation with fact
+ // table(appear as fk in fact table)
+ TreeMap<String, LinkedList<String>> lookupTableKeys = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ // possible values of lookupTableKeys, extracted from existing lookup
+ // tables.
+ // The key is in the format of tablename/columnname
+ TreeMap<String, ArrayList<String>> feasibleValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ // lookup table name -> sets of all composite keys
+ TreeMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
+ this.rowCount = rowCount;
+ this.conflictRatio = conflictRaio;
+ this.cubeName = cubeName;
+ this.randomSeed = randomSeed;
+ this.linkableRatio = linkableRatio;
+
+ this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
+ this.unlinkableRowCount = 0;
+
+ r = new Random(randomSeed);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ desc = cube.getDescriptor();
+ factTableName = desc.getFactTable();
+ store = ResourceStore.getStore(config);
+ }
+
+ /*
+ * users can specify the value preference for each column
+ */
+ private void loadConfig() {
+ try {
+ InputStream configStream = null;
+ configStream = store.getResource("/data/data_gen_config.json");
+ this.genConf = GenConfig.loadConfig(configStream);
+
+ if (configStream != null)
+ configStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ // only deal with composite keys
+ if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
+ lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
+ }
+
+ InputStream tableStream = null;
+ BufferedReader tableReader = null;
+ try {
+ TreeMap<String, Integer> zeroBasedInice = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ for (String columnName : columnNames) {
+ ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
+ zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
+ }
+
+ String path = "/data/" + lookupTableName + ".csv";
+ tableStream = store.getResource(path);
+ tableReader = new BufferedReader(new InputStreamReader(tableStream));
+ tableReader.mark(0);
+ int rowCount = 0;
+ int curRowNum = 0;
+ String curRow;
+
+ while (tableReader.readLine() != null)
+ rowCount++;
+
+ HashSet<Integer> rows = new HashSet<Integer>();
+ distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
+ while (rows.size() < distinctRowCount) {
+ rows.add(r.nextInt(rowCount));
+ }
+
+ // reopen the stream
+ tableReader.close();
+ tableStream.close();
+ tableStream = null;
+ tableReader = null;
+
+ tableStream = store.getResource(path);
+ tableReader = new BufferedReader(new InputStreamReader(tableStream));
+
+ while ((curRow = tableReader.readLine()) != null) {
+ if (rows.contains(curRowNum)) {
+ String[] tokens = curRow.split(",");
+
+ String[] comboKeys = null;
+ int index = 0;
+ if (columnNames.size() > 1)
+ comboKeys = new String[columnNames.size()];
+
+ for (String columnName : columnNames) {
+ int zeroBasedIndex = zeroBasedInice.get(columnName);
+ if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
+ feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
+ feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
+
+ if (columnNames.size() > 1) {
+ comboKeys[index] = tokens[zeroBasedIndex];
+ index++;
+ }
+ }
+
+ if (columnNames.size() > 1) {
+ Array<String> wrap = new Array<String>(comboKeys);
+ if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
+ throw new Exception("The composite key already exist in the lookup table");
+ }
+ lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
+ }
+ }
+ curRowNum++;
+ }
+
+ if (tableStream != null)
+ tableStream.close();
+ if (tableReader != null)
+ tableReader.close();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ // prepare the candidate values for each joined column
+ private void prepare() throws Exception {
+ // load config
+ loadConfig();
+
+ TreeSet<String> factTableColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+ for (DimensionDesc dim : desc.getDimensions()) {
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (col.getTable().equals(factTableName))
+ factTableColumns.add(col.getName());
+ }
+
+ JoinDesc join = dim.getJoin();
+ if (join != null) {
+ String lookupTable = dim.getTable();
+ for (String column : join.getPrimaryKey()) {
+ if (!lookupTableKeys.containsKey(lookupTable)) {
+ lookupTableKeys.put(lookupTable, new LinkedList<String>());
+ }
+
+ if (!lookupTableKeys.get(lookupTable).contains(column))
+ lookupTableKeys.get(lookupTable).add(column);
+ }
+ }
+ }
+
+ int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
+ distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
+ // lookup tables
+ for (String lookupTable : lookupTableKeys.keySet()) {
+ this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
+ }
+ }
+
+ private List<DimensionDesc> getSortedDimentsionDescs() {
+ List<DimensionDesc> dimensions = desc.getDimensions();
+ Collections.sort(dimensions, new Comparator<DimensionDesc>() {
+ @Override
+ public int compare(DimensionDesc o1, DimensionDesc o2) {
+ JoinDesc j1 = o2.getJoin();
+ JoinDesc j2 = o1.getJoin();
+ return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
+ }
+ });
+ return dimensions;
+ }
+
+ /**
+ * Generate the fact table and return it as text
+ *
+ * @return
+ * @throws Exception
+ */
+ private String cookData() throws Exception {
+ // the columns on the fact table can be classified into three groups:
+ // 1. foreign keys
+ TreeMap<String, String> factTableCol2LookupCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ // 2. metrics or directly used dimensions
+ TreeSet<String> usedCols = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+ // 3. others, not referenced anywhere
+
+ TreeMap<String, String> lookupCol2factTableCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ // find fact table columns in fks
+ List<DimensionDesc> dimensions = getSortedDimentsionDescs();
+ for (DimensionDesc dim : dimensions) {
+ JoinDesc jDesc = dim.getJoin();
+ if (jDesc != null) {
+ String[] fks = jDesc.getForeignKey();
+ String[] pks = jDesc.getPrimaryKey();
+ int num = fks.length;
+ for (int i = 0; i < num; ++i) {
+ String value = dim.getTable() + "/" + pks[i];
+
+ lookupCol2factTableCol.put(value, fks[i]);
+
+ if (factTableCol2LookupCol.containsKey(fks[i])) {
+ if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
+ System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
+ continue;
+ }
+ }
+ factTableCol2LookupCol.put(fks[i], value);
+ }
+ }
+ //else, deal with it in next roung
+ }
+
+ // find fact table columns in direct dimension
+ // DO NOT merge this with the previous loop
+ for (DimensionDesc dim : dimensions) {
+ JoinDesc jDesc = dim.getJoin();
+ if (jDesc == null) {
+ // column on fact table used directly as a dimension
+ for (String aColumn : dim.getColumn()) {
+ if (!factTableCol2LookupCol.containsKey(aColumn))
+ usedCols.add(aColumn);
+ }
+ }
+ }
+
+ // find fact table columns in measures
+ for (MeasureDesc mDesc : desc.getMeasures()) {
+ List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
+ if (pcols != null) {
+ for (TblColRef col : pcols) {
+ if (!factTableCol2LookupCol.containsKey(col.getName()))
+ usedCols.add(col.getName());
+ }
+ }
+ }
+
+ return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
+ }
+
+ private String normToTwoDigits(int v) {
+ if (v < 10)
+ return "0" + v;
+ else
+ return Integer.toString(v);
+ }
+
+ private String randomPick(ArrayList<String> candidates) {
+ int index = r.nextInt(candidates.size());
+ return candidates.get(index);
+ }
+
+ private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
+ DataType type = cDesc.getType();
+ if (type.isStringFamily()) {
+ throw new Exception("Can't handle range values for string");
+
+ } else if (type.isIntegerFamily()) {
+ int low = Integer.parseInt(range.get(0));
+ int high = Integer.parseInt(range.get(1));
+ return Integer.toString(r.nextInt(high - low) + low);
+
+ } else if (type.isDouble()) {
+ double low = Double.parseDouble(range.get(0));
+ double high = Double.parseDouble(range.get(1));
+ return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+ } else if (type.isFloat()) {
+ float low = Float.parseFloat(range.get(0));
+ float high = Float.parseFloat(range.get(1));
+ return String.format("%.4f", r.nextFloat() * (high - low) + low);
+
+ } else if (type.isDecimal()) {
+ double low = Double.parseDouble(range.get(0));
+ double high = Double.parseDouble(range.get(1));
+ return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+ } else if (type.isDateTimeFamily()) {
+ if (!type.isDate()) {
+ throw new RuntimeException("Does not support " + type);
+ }
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ Date start = format.parse(range.get(0));
+ Date end = format.parse(range.get(1));
+ long diff = end.getTime() - start.getTime();
+ Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(temp);
+ // first day
+ cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+ return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private String createRandomCell(ColumnDesc cDesc) {
+ String type = cDesc.getTypeName();
+ String s = type.toLowerCase();
+ if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 2; i++) {
+ sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
+ // possible strings
+ }
+ return sb.toString();
+ } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+ return Integer.toString(r.nextInt(128));
+ } else if (s.equals("double")) {
+ return String.format("%.4f", r.nextDouble() * 100);
+ } else if (s.equals("float")) {
+ return String.format("%.4f", r.nextFloat() * 100);
+ } else if (s.equals("decimal")) {
+ return String.format("%.4f", r.nextDouble() * 100);
+ } else if (s.equals("date")) {
+ long date20131231 = 61349312153265L;
+ long date20010101 = 60939158400000L;
+ long diff = date20131231 - date20010101;
+ Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(temp);
+ // first day
+ cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+ return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private String createDefaultsCell(String type) {
+ String s = type.toLowerCase();
+ if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+ return "abcde";
+ } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+ return "0";
+ } else if (s.equals("double")) {
+ return "0";
+ } else if (s.equals("float")) {
+ return "0";
+ } else if (s.equals("decimal")) {
+ return "0";
+ } else if (s.equals("date")) {
+ return "1970-01-01";
+ } else {
+ System.out.println("The data type " + type + "is not recognized");
+ System.exit(1);
+ }
+ return null;
+ }
+
+ private void printColumnMappings(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) {
+
+ System.out.println("=======================================================================");
+ System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
+ System.out.println();
+ System.out.println();
+ for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
+ System.out.format("%-30s %s", entry.getKey(), entry.getValue());
+ System.out.println();
+ }
+ for (String key : usedCols) {
+ System.out.format("%-30s %s", key, "Random Values");
+ System.out.println();
+ }
+ for (String key : defaultColumns) {
+ System.out.format("%-30s %s", key, "Default Values");
+ System.out.println();
+ }
+ System.out.println("=======================================================================");
+
+ System.out.println("Parameters:");
+ System.out.println();
+ System.out.println("CubeName: " + cubeName);
+ System.out.println("RowCount: " + rowCount);
+ System.out.println("ConflictRatio: " + conflictRatio);
+ System.out.println("LinkableRatio: " + linkableRatio);
+ System.out.println("Seed: " + randomSeed);
+ System.out.println();
+ System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
+ System.out.println("You can vary the above parameters to generate different datasets.");
+ System.out.println();
+ }
+
+ // Any row in the column must finally appear in the flatten big table.
+ // for single-column joins the generated row is guaranteed to have a match
+ // in lookup table
+ // for composite keys we'll need an extra check
+ private boolean matchAllCompositeKeys(TreeMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ for (String lookupTable : lookupTableKeys.keySet()) {
+ if (lookupTableKeys.get(lookupTable).size() == 1)
+ continue;
+
+ String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
+ int index = 0;
+ for (String column : lookupTableKeys.get(lookupTable)) {
+ String key = lookupTable + "/" + column;
+ String factTableCol = lookupCol2FactTableCol.get(key);
+ int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
+ comboKey[index] = columnValues.get(cardinal);
+
+ index++;
+ }
+ Array<String> wrap = new Array<String>(comboKey);
+ if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
+ // System.out.println("Try " + wrap + " Failed, continue...");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String createCell(ColumnDesc cDesc) throws Exception {
+ ColumnConfig cConfig = null;
+
+ if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
+ // if the column is not configured, use random values
+ return (createRandomCell(cDesc));
+
+ } else {
+ // the column has a configuration
+ if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
+ // if the column still allows random values
+ return (createRandomCell(cDesc));
+
+ } else {
+ // use specified values
+ ArrayList<String> valueSet = cConfig.getValueSet();
+ if (valueSet == null || valueSet.size() == 0)
+ throw new Exception("Did you forget to specify value set for " + cDesc.getName());
+
+ if (!cConfig.isAsRange()) {
+ return (randomPick(valueSet));
+ } else {
+ if (valueSet.size() != 2)
+ throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
+
+ return (createRandomCell(cDesc, valueSet));
+ }
+ }
+
+ }
+ }
+
+ private LinkedList<String> createRow(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ LinkedList<String> columnValues = new LinkedList<String>();
+
+ for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
+
+ String colName = cDesc.getName();
+
+ if (factTableCol2LookupCol.containsKey(colName)) {
+
+ // if the current column is a fk column in fact table
+ ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
+
+ columnValues.add(candidates.get(r.nextInt(candidates.size())));
+ } else if (usedCols.contains(colName)) {
+
+ // if the current column is a metric column in fact table
+ columnValues.add(createCell(cDesc));
+ } else {
+
+ // otherwise this column is not useful in OLAP
+ columnValues.add(createDefaultsCell(cDesc.getTypeName()));
+ defaultColumns.add(colName);
+ }
+ }
+
+ return columnValues;
+ }
+
+ /**
+ * return the text of table contents(one line one row)
+ *
+ * @param rowCount
+ * @param factTableCol2LookupCol
+ * @param lookupCol2FactTableCol
+ * @param usedCols
+ * @return
+ * @throws Exception
+ */
+ private String createTable(int rowCount, TreeMap<String, String> factTableCol2LookupCol, TreeMap<String, String> lookupCol2FactTableCol, TreeSet<String> usedCols) throws Exception {
+ try {
+ TreeSet<String> defaultColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < rowCount;) {
+
+ LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
+
+ if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
+ if (unlinkableRowCount < unlinkableRowCountMax) {
+ unlinkableRowCount++;
+ } else {
+ continue;
+ }
+ }
+
+ for (String c : columnValues)
+ sb.append(c + ",");
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(System.getProperty("line.separator"));
+
+ i++;
+
+ // System.out.println("Just generated the " + i + "th record");
+ }
+
+ printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
+
+ return sb.toString();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ return null;
+ }
+
+ /**
+ * Randomly create a fact table and return the table content
+ *
+ * @param cubeName name of the cube
+ * @param rowCount expected row count generated
+ * @param linkableRatio the percentage of fact table rows that can be linked with all
+ * lookup table by INNER join
+ * @param randomSeed random seed
+ */
+ public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
+
+ if (rowCount == null)
+ rowCount = "10000";
+ if (linkableRatio == null)
+ linkableRatio = "0.6";
+
+ //if (randomSeed == null)
+ // don't give it value
+
+ // String conflictRatio = "5";//this parameter do not allow configuring
+ // any more
+
+ FactTableGenerator generator = new FactTableGenerator();
+ long seed;
+ if (randomSeed != null) {
+ seed = Long.parseLong(randomSeed);
+ } else {
+ Random r = new Random();
+ seed = r.nextLong();
+ }
+
+ generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
+ generator.prepare();
+ return generator.cookData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
new file mode 100644
index 0000000..c58cfb6
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.kylin.common.util.JsonUtil;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class GenConfig {
+
+ @JsonProperty("columnConfigs")
+ private ArrayList<ColumnConfig> columnConfigs;
+
+ private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
+
+ public ArrayList<ColumnConfig> getColumnConfigs() {
+ return columnConfigs;
+ }
+
+ public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
+ this.columnConfigs = columnConfigs;
+ }
+
+ public ColumnConfig getColumnConfigByName(String columnName) {
+ columnName = columnName.toLowerCase();
+
+ if (cache.containsKey(columnName))
+ return cache.get(columnName);
+
+ for (ColumnConfig cConfig : columnConfigs) {
+ if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
+ cache.put(columnName, cConfig);
+ return cConfig;
+ }
+ }
+ cache.put(columnName, null);
+ return null;
+ }
+
+ public static GenConfig loadConfig(InputStream stream) {
+ try {
+ GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
+ return config;
+ } catch (JsonMappingException e) {
+ e.printStackTrace();
+ } catch (JsonParseException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
new file mode 100644
index 0000000..dcd460b
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -0,0 +1,240 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.invertedindex.SliceBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class IITest extends LocalFileMetadataTestCase {
+
+ String iiName = "test_kylin_ii_inner_join";
+ IIInstance ii;
+ IIDesc iiDesc;
+ String cubeName = "test_kylin_cube_with_slr_empty";
+
+ List<IIRow> iiRows;
+
+ final String[] inputData = new String[] { //
+ "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+ "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
+ "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
+ this.iiDesc = ii.getDescriptor();
+
+ List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+ @Nullable
+ @Override
+ public StreamMessage apply(String input) {
+ return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+ }
+ });
+
+ List<List<String>> parsedStreamMessages = Lists.newArrayList();
+ StreamParser parser = StringStreamParser.instance;
+
+ MicroStreamBatch batch = new MicroStreamBatch(0);
+ for (StreamMessage message : streamMessages) {
+ ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+ if ((parsedStreamMessage.isAccepted())) {
+ batch.add(parsedStreamMessage);
+ }
+ }
+
+ iiRows = Lists.newArrayList();
+ final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
+ IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
+ for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+ iiRows.add(iiRow);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ /**
+ * simulate stream building into slices, and encode the slice into IIRows.
+ * Then reconstruct the IIRows to slice.
+ */
+ @Test
+ public void basicTest() {
+ Queue<IIRow> buffer = Lists.newLinkedList();
+ FIFOIterable bufferIterable = new FIFOIterable(buffer);
+ TableRecordInfo info = new TableRecordInfo(iiDesc);
+ TableRecordInfoDigest digest = info.getDigest();
+ KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+ Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
+
+ Assert.assertTrue(!slices.hasNext());
+ Assert.assertEquals(iiRows.size(), digest.getColumnCount());
+
+ for (int i = 0; i < digest.getColumnCount(); ++i) {
+ buffer.add(iiRows.get(i));
+
+ if (i != digest.getColumnCount() - 1) {
+ Assert.assertTrue(!slices.hasNext());
+ } else {
+ Assert.assertTrue(slices.hasNext());
+ }
+ }
+
+ Slice newSlice = slices.next();
+ Assert.assertEquals(newSlice.getLocalDictionaries()[0].getSize(), 2);
+ }
+
+ @Test
+ public void IIEndpointTest() {
+ TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
+ CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
+ CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
+
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("SUM");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue("PRICE");
+ f1.setParameter(p1);
+ f1.setReturnType("decimal(19,4)");
+
+ TblColRef column = ii.getDescriptor().findColumnRef("default.test_kylin_fact", "cal_dt");
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+ ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter);
+ ConstantTupleFilter constantFilter = null;
+ constantFilter = new ConstantTupleFilter(("2012-08-16"));
+ compareFilter.addChild(constantFilter);
+
+ EndpointAggregators aggregators = EndpointAggregators.fromFunctions(info, Collections.singletonList(f1));
+ CoprocessorFilter filter = CoprocessorFilter.fromFilter(new ClearTextDictionary(info), compareFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
+
+ final Iterator<IIRow> iiRowIterator = iiRows.iterator();
+
+ IIEndpoint endpoint = new IIEndpoint();
+ IIProtos.IIResponseInternal response = endpoint.getResponse(new RegionScanner() {
+ @Override
+ public HRegionInfo getRegionInfo() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isFilterDone() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ throw new NotImplementedException();
+
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ if (iiRowIterator.hasNext()) {
+ IIRow iiRow = iiRowIterator.next();
+ result.addAll(iiRow.makeCells());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean next(List<Cell> result, int limit) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new NotImplementedException();
+ }
+ }, type, projector, aggregators, filter);
+
+ Assert.assertEquals(2, response.getRowsList().size());
+ System.out.println(response.getRowsList().size());
+ Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
+ for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
+ byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+ List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
+ Assert.assertTrue(answers.contains(metrics.get(0)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
new file mode 100644
index 0000000..be4fa26
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -0,0 +1,90 @@
+package org.apache.kylin.job.streaming;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+@Ignore
+public class CubeStreamConsumerTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
+
+ private KylinConfig kylinConfig;
+
+ private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ // remove all existing segments
+ CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+ List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+ queues.add(queue);
+ StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
+ final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+ loadDataFromLocalFile(queue, 100000);
+ future.get();
+ }
+
+ private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+ String line;
+ int count = 0;
+ while ((line = br.readLine()) != null && count++ < maxCount) {
+ final List<String> strings = Arrays.asList(line.split("\t"));
+ queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+ }
+ queue.put(StreamMessage.EOF);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
new file mode 100644
index 0000000..dc6d312
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
@@ -0,0 +1,144 @@
+package org.apache.kylin.job.streaming;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StreamingManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
+
+ @Before
+ public void setup() {
+ this.createTestMetadata();
+
+ }
+
+ @After
+ public void clear() {
+ this.cleanupTestMetadata();
+ }
+
+ private List<StreamMessage> prepareTestData(long start, long end, int count) {
+ double step = (double) (end - start) / (count - 1);
+ long ts = start;
+ int offset = 0;
+ ArrayList<StreamMessage> result = Lists.newArrayList();
+ for (int i = 0; i < count - 1; ++i) {
+ result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
+ ts += step;
+ }
+ result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
+ assertEquals(count, result.size());
+ assertEquals(start + "", new String(result.get(0).getRawData()));
+ assertEquals(end + "", new String(result.get(count - 1).getRawData()));
+ return result;
+ }
+
+ @Test
+ public void test() throws ExecutionException, InterruptedException {
+
+ List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+ queues.add(new LinkedBlockingQueue<StreamMessage>());
+ queues.add(new LinkedBlockingQueue<StreamMessage>());
+
+ final long interval = 3000L;
+ final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
+
+ final List<Integer> partitionIds = Lists.newArrayList();
+ for (int i = 0; i < queues.size(); i++) {
+ partitionIds.add(i);
+ }
+
+ final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
+ @Override
+ public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+ logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
+ }
+
+ @Override
+ public void stop() {
+ logger.info("consumer stopped");
+ }
+ };
+ final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
+
+ streamBuilder.setStreamParser(new StreamParser() {
+ @Override
+ public ParsedStreamMessage parse(StreamMessage streamMessage) {
+ return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
+ }
+ });
+
+ Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
+ long timeout = nextPeriodStart + interval;
+ int messageCount = 0;
+ int inPeriodMessageCount = 0;
+ int expectedOffset = 0;
+ logger.info("prepare to add StreamMessage");
+ while (true) {
+ long ts = System.currentTimeMillis();
+ if (ts >= timeout + interval) {
+ break;
+ }
+ if (ts >= nextPeriodStart && ts < timeout) {
+ inPeriodMessageCount++;
+ }
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
+ }
+ if (expectedOffset == 0 && ts >= timeout) {
+ expectedOffset = messageCount - 1;
+ }
+ messageCount++;
+ Thread.sleep(10);
+ }
+ logger.info("totally put " + messageCount + " StreamMessages");
+ logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
+
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ queue.put(StreamMessage.EOF);
+ }
+
+ future.get();
+
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ queue.take();
+ }
+
+ final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
+ logger.info("offset:" + offsets);
+ for (Long offset : offsets.values()) {
+ assertEquals(expectedOffset, offset.longValue());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
new file mode 100644
index 0000000..075a048
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+
+/**
+ * this is for generating fact table data for test_streaming_table (cube streaming)
+ */
+public class StreamingTableDataGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
+ Preconditions.checkArgument(startTime < endTime);
+ Preconditions.checkArgument(recordCount > 0);
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+
+ SortedMultiset<Long> times = TreeMultiset.create();
+ Random r = new Random();
+ for (int i = 0; i < recordCount; i++) {
+ long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
+ times.add(t);
+ }
+
+ List<String> ret = Lists.newArrayList();
+ HashMap<String, String> kvs = Maps.newHashMap();
+ for (long time : times) {
+ kvs.clear();
+ kvs.put("timestamp", String.valueOf(time));
+ for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+ String lowerCaseColumnName = columnDesc.getName().toLowerCase();
+ DataType dataType = columnDesc.getType();
+ if (dataType.isDateTimeFamily()) {
+ //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
+ continue;
+ } else if (dataType.isStringFamily()) {
+ char c = (char) ('A' + (int) (26 * r.nextDouble()));
+ kvs.put(lowerCaseColumnName, String.valueOf(c));
+ } else if (dataType.isIntegerFamily()) {
+ int v = r.nextInt(10000);
+ kvs.put(lowerCaseColumnName, String.valueOf(v));
+ } else if (dataType.isNumberFamily()) {
+ String v = String.format("%.4f", r.nextDouble() * 100);
+ kvs.put(lowerCaseColumnName, v);
+ }
+ }
+ try {
+ ret.add(mapper.writeValueAsString(kvs));
+ } catch (JsonProcessingException e) {
+ logger.error("error!", e);
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..8218d51
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+ @Before
+ public void setup() throws Exception {
+ super.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.cleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws IOException {
+ if (!useSandbox())
+ return;
+
+ KylinConfig config = getTestConfig();
+ String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+ Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+ assertTrue(loaded.size() == toLoad.length);
+ for (String str : toLoad)
+ assertTrue(loaded.contains(str));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..57c0be3
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+ @Test
+ public void test() throws IOException {
+ HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+ int rowNumber = 0;
+ while (reader.next()) {
+ String[] row = reader.getRow();
+ Assert.assertEquals(9, row.length);
+ //System.out.println(ArrayUtils.toString(row));
+ rowNumber++;
+ }
+
+ reader.close();
+ Assert.assertEquals(10000, rowNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..0df632a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+ SnapshotManager snapshotMgr;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ String tableName = "EDW.TEST_SITES";
+ TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+ ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+ String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+ snapshotMgr.wipeoutCache();
+
+ SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+ // compare hive & snapshot
+ TableReader hiveReader = hiveTable.getReader();
+ TableReader snapshotReader = snapshot.getReader();
+
+ while (true) {
+ boolean hiveNext = hiveReader.next();
+ boolean snapshotNext = snapshotReader.next();
+ assertEquals(hiveNext, snapshotNext);
+
+ if (hiveNext == false)
+ break;
+
+ String[] hiveRow = hiveReader.getRow();
+ String[] snapshotRow = snapshotReader.getRow();
+ assertArrayEquals(hiveRow, snapshotRow);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/build/script/prepare_libs.sh
----------------------------------------------------------------------
diff --git a/build/script/prepare_libs.sh b/build/script/prepare_libs.sh
index a80d4f4..2012b3f 100755
--- a/build/script/prepare_libs.sh
+++ b/build/script/prepare_libs.sh
@@ -13,7 +13,7 @@ echo "version ${version}"
echo "copy lib file"
rm -rf build/lib
mkdir build/lib
-cp job/target/kylin-job-${version}-job.jar build/lib/kylin-job-${version}.jar
+cp assembly/target/kylin-job-${version}-job.jar build/lib/kylin-job-${version}.jar
cp storage-hbase/target/kylin-storage-hbase-${version}-coprocessor.jar build/lib/kylin-coprocessor-${version}.jar
cp jdbc/target/kylin-jdbc-${version}.jar build/lib/kylin-jdbc-${version}.jar
# Copied file becomes 000 for some env (e.g. my Cygwin)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
index 001dfe5..e5a4540 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
index 9ebe335..1e34f39 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
/**
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ExecutableOutputPO extends RootPersistentEntity {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index 52df562..75717e0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
/**
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ExecutablePO extends RootPersistentEntity {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
new file mode 100644
index 0000000..ecac973
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ */
+public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
+
+ private DefaultScheduler scheduler;
+
+ protected ExecutableManager jobService;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
+ jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+ scheduler = DefaultScheduler.getInstance();
+ scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ static void setFinalStatic(Field field, Object newValue) throws Exception {
+ field.setAccessible(true);
+
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+ field.set(null, newValue);
+ }
+
+ protected void waitForJobFinish(String jobId) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ final ExecutableState status = job.getStatus();
+ if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
+ break;
+ } else {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ protected void waitForJobStatus(String jobId, ExecutableState state, long interval) {
+ while (true) {
+ AbstractExecutable job = jobService.getJob(jobId);
+ if (job.getStatus() == state) {
+ break;
+ } else {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
new file mode 100644
index 0000000..d50baad
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Test;
+
+/**
+ */
+public class DefaultSchedulerTest extends BaseSchedulerTest {
+
+ @Test
+ public void testSingleTaskJob() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ }
+
+ @Test
+ public void testSucceed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndFailed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new FailedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndError() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new ErrorTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+ assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testDiscard() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SelfStopExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+ jobService.discardJob(job.getId());
+ waitForJobFinish(job.getId());
+ assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+ assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+ Thread.sleep(5000);
+ System.out.println(job);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testSchedulerPool() throws InterruptedException {
+ ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(3);
+ ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch.countDown();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+ assertTrue("future should still running", future.cancel(true));
+
+ final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+ ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch2.countDown();
+ throw new RuntimeException();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+ assertFalse("future2 should has been stopped", future2.cancel(true));
+
+ final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+ ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ countDownLatch3.countDown();
+ throw new RuntimeException();
+ } catch (Exception e) {
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+ assertTrue("future3 should still running", future3.cancel(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-storage/pom.xml
----------------------------------------------------------------------
diff --git a/core-storage/pom.xml b/core-storage/pom.xml
index 4bb7695..e17d13f 100644
--- a/core-storage/pom.xml
+++ b/core-storage/pom.xml
@@ -42,12 +42,6 @@
</dependency>
<dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-invertedindex</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
-
- <dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>2.8.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 3aa01e3..d2b150e 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -86,14 +86,6 @@
</dependency>
<dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-job</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- <version>${project.parent.version}</version>
- </dependency>
-
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index dd87782..896aa80 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.UnsignedBytes;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -66,7 +67,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
import org.apache.spark.SparkConf;
@@ -80,9 +81,11 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
+
import scala.Tuple2;
import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;