You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:21:57 UTC
[20/23] incubator-kylin git commit: KYLIN-875 half way
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
new file mode 100644
index 0000000..c90c5d3
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DictGridTableTest {
+
+ @Test
+ public void test() throws IOException {
+ GridTable table = newTestTable();
+ verifyScanRangePlanner(table);
+ verifyFirstRow(table);
+ verifyScanWithUnevaluatableFilter(table);
+ verifyScanWithEvaluatableFilter(table);
+ verifyConvertFilterConstants1(table);
+ verifyConvertFilterConstants2(table);
+ verifyConvertFilterConstants3(table);
+ verifyConvertFilterConstants4(table);
+ }
+
+ private void verifyScanRangePlanner(GridTable table) {
+ GTInfo info = table.getInfo();
+ GTScanRangePlanner planner = new GTScanRangePlanner(info);
+
+ CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+ CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+ CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+ CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+ CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+ CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+ CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+ // flatten or-and & hbase fuzzy value
+ {
+ LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());
+ assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
+ assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+ }
+
+ // pre-evaluate ever false
+ {
+ LogicalTupleFilter filter = and(timeComp1, timeComp2);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());
+ }
+
+ // pre-evaluate ever true
+ {
+ LogicalTupleFilter filter = or(timeComp1, ageComp4);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge overlap range
+ {
+ LogicalTupleFilter filter = or(timeComp1, timeComp3);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge too many ranges
+ {
+ LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(3, r.size());
+ assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+ assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+ assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+ List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+ assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+ }
+ }
+
+ private void verifyFirstRow(GridTable table) throws IOException {
+ doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
+ }
+
+ private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+ LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+ LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+ GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+ // note the unEvaluatable column 1 in filter is added to group by
+ assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+ doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
+ }
+
+ private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+ // note the evaluatable column 1 in filter is added to returned columns but not in group by
+ assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+ doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
+ }
+
+ private void verifyConvertFilterConstants1(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants2(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1<"9" round up to $1<"10"
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants3(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1<="9" round down to FALSE
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants4(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1 in ("9", "10", "15") has only "10" left
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
+ }
+
+ private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+ System.out.println(req);
+ IGTScanner scanner = table.scan(req);
+ int i = 0;
+ for (GTRecord r : scanner) {
+ System.out.println(r);
+ if (verifyRows != null && i < verifyRows.length) {
+ assertEquals(verifyRows[i], r.toString());
+ }
+ i++;
+ }
+ scanner.close();
+ }
+
+ private Object enc(GTInfo info, int col, String value) {
+ ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+ info.codeSystem.encodeColumnValue(col, value, buf);
+ return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+ }
+
+ private ExtractTupleFilter unevaluatable(TblColRef col) {
+ ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+ r.addChild(new ColumnTupleFilter(col));
+ return r;
+ }
+
+ private CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+ CompareTupleFilter result = new CompareTupleFilter(op);
+ result.addChild(new ColumnTupleFilter(col));
+ result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+ return result;
+ }
+
+ private LogicalTupleFilter and(TupleFilter... children) {
+ return logic(FilterOperatorEnum.AND, children);
+ }
+
+ private LogicalTupleFilter or(TupleFilter... children) {
+ return logic(FilterOperatorEnum.OR, children);
+ }
+
+ private LogicalTupleFilter not(TupleFilter child) {
+ return logic(FilterOperatorEnum.NOT, child);
+ }
+
+ private LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+ LogicalTupleFilter result = new LogicalTupleFilter(op);
+ for (TupleFilter c : children) {
+ result.addChild(c);
+ }
+ return result;
+ }
+
+ static GridTable newTestTable() throws IOException {
+ GTInfo info = newInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTRecord r = new GTRecord(table.getInfo());
+ GTBuilder builder = table.rebuild();
+
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+ builder.close();
+
+ return table;
+ }
+
+ static GTInfo newInfo() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(newDictCodeSystem());
+ builder.setColumns( //
+ DataType.getInstance("timestamp"), //
+ DataType.getInstance("integer"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("bigint"), //
+ DataType.getInstance("decimal") //
+ );
+ builder.setPrimaryKey(setOf(0, 1));
+ builder.setColumnPreferIndex(setOf(0));
+ builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+ builder.enableRowBlock(4);
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static CubeCodeSystem newDictCodeSystem() {
+ Map<Integer, Dictionary> dictionaryMap = Maps.newHashMap();
+ dictionaryMap.put(1, newDictionaryOfInteger());
+ dictionaryMap.put(2, newDictionaryOfString());
+ return new CubeCodeSystem(dictionaryMap);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Dictionary newDictionaryOfString() {
+ TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+ builder.addValue("Dong");
+ builder.addValue("George");
+ builder.addValue("Jason");
+ builder.addValue("Kejia");
+ builder.addValue("Luke");
+ builder.addValue("Mahone");
+ builder.addValue("Qianhao");
+ builder.addValue("Shaofeng");
+ builder.addValue("Xu");
+ builder.addValue("Yang");
+ return builder.build(0);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Dictionary newDictionaryOfInteger() {
+ NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
+ builder.addValue("10");
+ builder.addValue("20");
+ builder.addValue("30");
+ builder.addValue("40");
+ builder.addValue("50");
+ builder.addValue("60");
+ builder.addValue("70");
+ builder.addValue("80");
+ builder.addValue("90");
+ builder.addValue("100");
+ return builder.build(0);
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
new file mode 100644
index 0000000..c87e970
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.junit.Test;
+
+public class SimpleGridTableTest {
+
+ @Test
+ public void testBasics() throws IOException {
+ GTInfo info = UnitTestSupport.basicInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAdvanced() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAggregate() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scanAndAggregate(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAppend() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ rebuildViaAppend(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(3, scanner.getScannedRowBlockCount());
+ assertEquals(10, scanner.getScannedRowCount());
+ }
+
+ private IGTScanner scan(GridTable table) throws IOException {
+ GTScanRequest req = new GTScanRequest(table.getInfo());
+ IGTScanner scanner = table.scan(req);
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ assertTrue(((String) v[0]).startsWith("2015-"));
+ assertTrue(((String) v[2]).equals("Food"));
+ assertTrue(((LongMutable) v[3]).get() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ System.out.println(r);
+ }
+ scanner.close();
+ System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+ System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+ return scanner;
+ }
+
+ private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+ GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+ IGTScanner scanner = table.scan(req);
+ int i = 0;
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ switch (i) {
+ case 0:
+ assertTrue(((LongMutable) v[3]).get() == 20);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+ break;
+ case 1:
+ assertTrue(((LongMutable) v[3]).get() == 30);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+ break;
+ case 2:
+ assertTrue(((LongMutable) v[3]).get() == 40);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+ break;
+ case 3:
+ assertTrue(((LongMutable) v[3]).get() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ break;
+ default:
+ fail();
+ }
+ i++;
+ System.out.println(r);
+ }
+ scanner.close();
+ System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+ System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+ return scanner;
+ }
+
+ static GTBuilder rebuild(GridTable table) throws IOException {
+ GTBuilder builder = table.rebuild();
+ for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
+ builder.write(rec);
+ }
+ builder.close();
+
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+ return builder;
+ }
+
+ static void rebuildViaAppend(GridTable table) throws IOException {
+ List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
+ GTBuilder builder;
+ int i = 0;
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
new file mode 100644
index 0000000..f5c9645
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInvertedIndex;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.serializer.StringSerializer;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SimpleInvertedIndexTest {
+
+ GTInfo info;
+ GTInvertedIndex index;
+ ArrayList<CompareTupleFilter> basicFilters = Lists.newArrayList();
+ ArrayList<ConciseSet> basicResults = Lists.newArrayList();
+
+ public SimpleInvertedIndexTest() {
+
+ info = UnitTestSupport.advancedInfo();
+ TblColRef colA = info.colRef(0);
+
+ // block i contains value "i", the last is NULL
+ index = new GTInvertedIndex(info);
+ GTRowBlock mockBlock = GTRowBlock.allocate(info);
+ GTRowBlock.Writer writer = mockBlock.getWriter();
+ GTRecord record = new GTRecord(info);
+ for (int i = 0; i < 10; i++) {
+ record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0));
+ for (int j = 0; j < info.getRowBlockSize(); j++) {
+ writer.append(record);
+ }
+ writer.readyForFlush();
+ index.add(mockBlock);
+
+ writer.clearForNext();
+ }
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL));
+ basicResults.add(set(9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0));
+ basicResults.add(set(0));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5));
+ basicResults.add(set(0, 5));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3));
+ basicResults.add(set(0, 1, 2));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3));
+ basicResults.add(set(0, 1, 2, 3));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3));
+ basicResults.add(set(4, 5, 6, 7, 8));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3));
+ basicResults.add(set(3, 4, 5, 6, 7, 8));
+ }
+
+ @Test
+ public void testBasics() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ assertEquals(basicResults.get(i), index.filter(basicFilters.get(i)));
+ }
+ }
+
+ @Test
+ public void testLogicalAnd() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ for (int j = 0; j < basicFilters.size(); j++) {
+ LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j));
+ ConciseSet r = basicResults.get(i).clone();
+ r.retainAll(basicResults.get(j));
+ assertEquals(r, index.filter(f));
+ }
+ }
+ }
+
+ @Test
+ public void testLogicalOr() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ for (int j = 0; j < basicFilters.size(); j++) {
+ LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j));
+ ConciseSet r = basicResults.get(i).clone();
+ r.addAll(basicResults.get(j));
+ assertEquals(r, index.filter(f));
+ }
+ }
+ }
+
+ @Test
+ public void testNotEvaluable() {
+ ConciseSet all = set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ CompareTupleFilter notEvaluable = compare(info.colRef(1), FilterOperatorEnum.EQ, 0);
+ assertEquals(all, index.filter(notEvaluable));
+
+ LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(0), notEvaluable);
+ assertEquals(all, index.filter(or));
+
+ LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(0), notEvaluable);
+ assertEquals(basicResults.get(0), index.filter(and));
+ }
+
+ public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) {
+ CompareTupleFilter filter = new CompareTupleFilter(op);
+ filter.addChild(columnFilter(col));
+ for (int i : ids) {
+ filter.addChild(constFilter(i));
+ }
+ return filter;
+ }
+
+ public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) {
+ LogicalTupleFilter filter = new LogicalTupleFilter(op);
+ for (TupleFilter f : filters)
+ filter.addChild(f);
+ return filter;
+ }
+
+ public static ColumnTupleFilter columnFilter(TblColRef col) {
+ return new ColumnTupleFilter(col);
+ }
+
+ public static ConstantTupleFilter constFilter(int id) {
+ byte[] space = new byte[10];
+ ByteBuffer buf = ByteBuffer.wrap(space);
+ StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
+ stringSerializer.serialize("" + id, buf);
+ ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
+ return new ConstantTupleFilter(data);
+ }
+
+ public static ConciseSet set(int... ints) {
+ ConciseSet set = new ConciseSet();
+ for (int i : ints)
+ set.add(i);
+ return set;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/pom.xml
----------------------------------------------------------------------
diff --git a/core-job/pom.xml b/core-job/pom.xml
index b51c1cf..95339f6 100644
--- a/core-job/pom.xml
+++ b/core-job/pom.xml
@@ -41,6 +41,11 @@
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
new file mode 100644
index 0000000..6b814ef
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class BuildEngineFactory {
+
+ private static IBatchCubingEngine defaultBatchEngine;
+
+ public static IBatchCubingEngine defaultBatchEngine() {
+ if (defaultBatchEngine == null) {
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ if (conf.isCubingInMem()) {
+ defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+ } else {
+ defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
+ }
+ }
+ return defaultBatchEngine;
+ }
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
+ }
+
+ /** Merge multiple small segments into a big one. */
+ public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
new file mode 100644
index 0000000..904f557
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IBatchCubingEngine {
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+
+ /** Merge multiple small segments into a big one. */
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+
+ public Class<?> getSourceInterface();
+
+ public Class<?> getStorageInterface();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
new file mode 100644
index 0000000..0359ce9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+
+public interface IStreamingCubingEngine {
+
+ public Runnable createStreamingCubingBuilder(CubeSegment seg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
new file mode 100644
index 0000000..e7f5772
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
+
+ public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
+
+ public static final String YARN_APP_ID = "yarn_application_id";
+ public static final String YARN_APP_URL = "yarn_application_tracking_url";
+ public static final String MR_JOB_ID = "mr_job_id";
+ public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+ public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+ public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+
+ public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
+ return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
+ }
+
+ public static String getJobIdentity(JobInstance jobInstance) {
+ return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
+ }
+
+ public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
+ return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
+ }
+
+ public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
+ if (jobUuid == null || jobUuid.equals("")) {
+ throw new IllegalArgumentException("jobUuid can't be null or empty");
+ }
+ return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
+ }
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("type")
+ private CubeBuildTypeEnum type; // java implementation
+ @JsonProperty("duration")
+ private long duration;
+ @JsonProperty("related_cube")
+ private String relatedCube;
+ @JsonProperty("related_segment")
+ private String relatedSegment;
+ @JsonProperty("exec_start_time")
+ private long execStartTime;
+ @JsonProperty("exec_end_time")
+ private long execEndTime;
+ @JsonProperty("mr_waiting")
+ private long mrWaiting = 0;
+ @JsonManagedReference
+ @JsonProperty("steps")
+ private List<JobStep> steps;
+ @JsonProperty("submitter")
+ private String submitter;
+ @JsonProperty("job_status")
+ private JobStatusEnum status;
+
+ public JobStep getRunningStep() {
+ for (JobStep step : this.getSteps()) {
+ if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
+ return step;
+ }
+ }
+
+ return null;
+ }
+
+ @JsonProperty("progress")
+ public double getProgress() {
+ int completedStepCount = 0;
+ for (JobStep step : this.getSteps()) {
+ if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
+ completedStepCount++;
+ }
+ }
+
+ return 100.0 * completedStepCount / steps.size();
+ }
+
+ public JobStatusEnum getStatus() {
+ return this.status;
+ }
+
+ public void setStatus(JobStatusEnum status) {
+ this.status = status;
+ }
+
+// @JsonProperty("job_status")
+// public JobStatusEnum getStatus() {
+//
+// // JobStatusEnum finalJobStatus;
+// int compositResult = 0;
+//
+// // if steps status are all NEW, then job status is NEW
+// // if steps status are all FINISHED, then job status is FINISHED
+// // if steps status are all PENDING, then job status is PENDING
+// // if steps status are FINISHED and PENDING, the job status is PENDING
+// // if one of steps status is RUNNING, then job status is RUNNING
+// // if one of steps status is ERROR, then job status is ERROR
+// // if one of steps status is KILLED, then job status is KILLED
+// // default status is RUNNING
+//
+// System.out.println(this.getName());
+//
+// for (JobStep step : this.getSteps()) {
+// //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
+// compositResult = compositResult | step.getStatus().getCode();
+// }
+//
+// System.out.println();
+//
+// if (compositResult == JobStatusEnum.FINISHED.getCode()) {
+// return JobStatusEnum.FINISHED;
+// } else if (compositResult == JobStatusEnum.NEW.getCode()) {
+// return JobStatusEnum.NEW;
+// } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
+// return JobStatusEnum.PENDING;
+// } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
+// return JobStatusEnum.PENDING;
+// } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
+// return JobStatusEnum.ERROR;
+// } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
+// return JobStatusEnum.DISCARDED;
+// } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
+// return JobStatusEnum.RUNNING;
+// }
+//
+// return JobStatusEnum.RUNNING;
+// }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public CubeBuildTypeEnum getType() {
+ return type;
+ }
+
+ public void setType(CubeBuildTypeEnum type) {
+ this.type = type;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public String getRelatedCube() {
+ return relatedCube;
+ }
+
+ public void setRelatedCube(String relatedCube) {
+ this.relatedCube = relatedCube;
+ }
+
+ public String getRelatedSegment() {
+ return relatedSegment;
+ }
+
+ public void setRelatedSegment(String relatedSegment) {
+ this.relatedSegment = relatedSegment;
+ }
+
+ /**
+ * @return the execStartTime
+ */
+ public long getExecStartTime() {
+ return execStartTime;
+ }
+
+ /**
+ * @param execStartTime the execStartTime to set
+ */
+ public void setExecStartTime(long execStartTime) {
+ this.execStartTime = execStartTime;
+ }
+
+ /**
+ * @return the execEndTime
+ */
+ public long getExecEndTime() {
+ return execEndTime;
+ }
+
+ /**
+ * @param execEndTime the execEndTime to set
+ */
+ public void setExecEndTime(long execEndTime) {
+ this.execEndTime = execEndTime;
+ }
+
+ public long getMrWaiting() {
+ return this.mrWaiting;
+ }
+
+ public void setMrWaiting(long mrWaiting) {
+ this.mrWaiting = mrWaiting;
+ }
+
+ public List<JobStep> getSteps() {
+ if (steps == null) {
+ steps = Lists.newArrayList();
+ }
+ return steps;
+ }
+
+ public void clearSteps() {
+ getSteps().clear();
+ }
+
+ public void addSteps(Collection<JobStep> steps) {
+ this.getSteps().addAll(steps);
+ }
+
+ public void addStep(JobStep step) {
+ getSteps().add(step);
+ }
+
+ public void addStep(int index, JobStep step) {
+ getSteps().add(index, step);
+ }
+
+ public JobStep findStep(String stepName) {
+ for (JobStep step : getSteps()) {
+ if (stepName.equals(step.getName())) {
+ return step;
+ }
+ }
+ return null;
+ }
+
+
+ public String getSubmitter() {
+ return submitter;
+ }
+
+ public void setSubmitter(String submitter) {
+ this.submitter = submitter;
+ }
+
+
+
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class JobStep implements Comparable<JobStep> {
+
+ @JsonBackReference
+ private JobInstance jobInstance;
+
+ @JsonProperty("id")
+ private String id;
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("sequence_id")
+ private int sequenceID;
+
+ @JsonProperty("exec_cmd")
+ private String execCmd;
+
+ @JsonProperty("interrupt_cmd")
+ private String InterruptCmd;
+
+ @JsonProperty("exec_start_time")
+ private long execStartTime;
+ @JsonProperty("exec_end_time")
+ private long execEndTime;
+ @JsonProperty("exec_wait_time")
+ private long execWaitTime;
+
+ @JsonProperty("step_status")
+ private JobStepStatusEnum status;
+
+ @JsonProperty("cmd_type")
+ private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
+
+ @JsonProperty("info")
+ private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
+
+ @JsonProperty("run_async")
+ private boolean runAsync = false;
+
+ private ConcurrentHashMap<String, String> getInfo() {
+ return info;
+ }
+
+ public void putInfo(String key, String value) {
+ getInfo().put(key, value);
+ }
+
+ public String getInfo(String key) {
+ return getInfo().get(key);
+ }
+
+ public void clearInfo() {
+ getInfo().clear();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getSequenceID() {
+ return sequenceID;
+ }
+
+ public void setSequenceID(int sequenceID) {
+ this.sequenceID = sequenceID;
+ }
+
+ public String getExecCmd() {
+ return execCmd;
+ }
+
+ public void setExecCmd(String execCmd) {
+ this.execCmd = execCmd;
+ }
+
+ public JobStepStatusEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(JobStepStatusEnum status) {
+ this.status = status;
+ }
+
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the execStartTime
+ */
+ public long getExecStartTime() {
+ return execStartTime;
+ }
+
+ /**
+ * @param execStartTime the execStartTime to set
+ */
+ public void setExecStartTime(long execStartTime) {
+ this.execStartTime = execStartTime;
+ }
+
+ /**
+ * @return the execEndTime
+ */
+ public long getExecEndTime() {
+ return execEndTime;
+ }
+
+ /**
+ * @param execEndTime the execEndTime to set
+ */
+ public void setExecEndTime(long execEndTime) {
+ this.execEndTime = execEndTime;
+ }
+
+ public long getExecWaitTime() {
+ return execWaitTime;
+ }
+
+ public void setExecWaitTime(long execWaitTime) {
+ this.execWaitTime = execWaitTime;
+ }
+
+ public String getInterruptCmd() {
+ return InterruptCmd;
+ }
+
+ public void setInterruptCmd(String interruptCmd) {
+ InterruptCmd = interruptCmd;
+ }
+
+ public JobStepCmdTypeEnum getCmdType() {
+ return cmdType;
+ }
+
+ public void setCmdType(JobStepCmdTypeEnum cmdType) {
+ this.cmdType = cmdType;
+ }
+
+ /**
+ * @return the runAsync
+ */
+ public boolean isRunAsync() {
+ return runAsync;
+ }
+
+ /**
+ * @param runAsync the runAsync to set
+ */
+ public void setRunAsync(boolean runAsync) {
+ this.runAsync = runAsync;
+ }
+
+ /**
+ * @return the jobInstance
+ */
+ public JobInstance getJobInstance() {
+ return jobInstance;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + sequenceID;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JobStep other = (JobStep) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (sequenceID != other.sequenceID)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(JobStep o) {
+ if (this.sequenceID < o.sequenceID) {
+ return -1;
+ } else if (this.sequenceID > o.sequenceID) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(JobInstance o) {
+ return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
new file mode 100644
index 0000000..932da38
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+
+public class JoinedFlatTable {
+
+ public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
+
+ public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
+
+ public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ return storageDfsDir + "/" + intermediateTableDesc.getTableName();
+ }
+
+ public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ StringBuilder ddl = new StringBuilder();
+
+ ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+
+ ddl.append("(" + "\n");
+ for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+ IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ if (i > 0) {
+ ddl.append(",");
+ }
+ ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+ }
+ ddl.append(")" + "\n");
+
+ ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
+ ddl.append("STORED AS SEQUENCEFILE" + "\n");
+ ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+ // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
+ // ";\n");
+ return ddl.toString();
+ }
+
+ public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ StringBuilder ddl = new StringBuilder();
+ ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
+ return ddl.toString();
+ }
+
+ public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
+ StringBuilder sql = new StringBuilder();
+
+ File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
+
+ if (hadoopPropertiesFile.exists()) {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder;
+ Document doc;
+ try {
+ builder = factory.newDocumentBuilder();
+ doc = builder.parse(hadoopPropertiesFile);
+ NodeList nl = doc.getElementsByTagName("property");
+ for (int i = 0; i < nl.getLength(); i++) {
+ String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+ String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+ if (name.equals("tmpjars") == false) {
+ sql.append("SET " + name + "=" + value + ";").append("\n");
+ }
+ }
+
+ } catch (ParserConfigurationException e) {
+ throw new IOException(e);
+ } catch (SAXException e) {
+ throw new IOException(e);
+ }
+ }
+
+ sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+
+ return sql.toString();
+ }
+
+ public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT" + "\n");
+ String tableAlias;
+ Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+ for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+ IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ if (i > 0) {
+ sql.append(",");
+ }
+ tableAlias = tableAliasMap.get(col.getTableName());
+ sql.append(tableAlias + "." + col.getColumnName() + "\n");
+ }
+ appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
+ appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+ return sql.toString();
+ }
+
+ private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
+ Map<String, String> tableAliasMap = new HashMap<String, String>();
+
+ tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
+
+ int i = 1;
+ for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
+ JoinDesc join = lookupDesc.getJoin();
+ if (join != null) {
+ tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
+ i++;
+ }
+
+ }
+ return tableAliasMap;
+ }
+
+ private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ Set<String> dimTableCache = new HashSet<String>();
+
+ DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+ String factTableName = dataModelDesc.getFactTable();
+ String factTableAlias = tableAliasMap.get(factTableName);
+ sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
+
+ for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
+ JoinDesc join = lookupDesc.getJoin();
+ if (join != null && join.getType().equals("") == false) {
+ String joinType = join.getType().toUpperCase();
+ String dimTableName = lookupDesc.getTable();
+ if (!dimTableCache.contains(dimTableName)) {
+ TblColRef[] pk = join.getPrimaryKeyColumns();
+ TblColRef[] fk = join.getForeignKeyColumns();
+ if (pk.length != fk.length) {
+ throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+ }
+ sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
+ sql.append("ON ");
+ for (int i = 0; i < pk.length; i++) {
+ if (i > 0) {
+ sql.append(" AND ");
+ }
+ sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
+ }
+ sql.append("\n");
+
+ dimTableCache.add(dimTableName);
+ }
+ }
+ }
+ }
+
+ private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+ return;//TODO: for now only cube segments support filter and partition
+ }
+ CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+
+ boolean hasCondition = false;
+ StringBuilder whereBuilder = new StringBuilder();
+ whereBuilder.append("WHERE");
+
+ CubeDesc cubeDesc = desc.getCubeDesc();
+ DataModelDesc model = cubeDesc.getModel();
+
+ if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
+ whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
+ hasCondition = true;
+ }
+
+ CubeSegment cubeSegment = desc.getCubeSegment();
+
+ if (null != cubeSegment) {
+ PartitionDesc partDesc = model.getPartitionDesc();
+ long dateStart = cubeSegment.getDateRangeStart();
+ long dateEnd = cubeSegment.getDateRangeEnd();
+
+ if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
+ whereBuilder.append(hasCondition ? " AND (" : " (");
+ whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
+ whereBuilder.append(")\n");
+ hasCondition = true;
+ }
+ }
+
+ if (hasCondition) {
+ sql.append(whereBuilder.toString());
+ }
+ }
+
+ private static String colName(String canonicalColName) {
+ return canonicalColName.replace(".", "_");
+ }
+
+ private static String getHiveDataType(String javaDataType) {
+ String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
+ hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+
+ return hiveDataType.toLowerCase();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
new file mode 100644
index 0000000..2ed2fc2
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.lock.JobLock;
+
+/**
+ */
+public interface Scheduler<T extends Executable> {
+
+ void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
+
+ void shutdown() throws SchedulerException;
+
+ boolean stop(T executable) throws SchedulerException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
new file mode 100644
index 0000000..29b5324
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+/**
+ */
+public abstract class BaseCommandOutput implements ICommandOutput {
+
+ @Override
+ public void log(String message) {
+ this.appendOutput(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
new file mode 100644
index 0000000..6cab6a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ *
+ */
+public interface ICommandOutput extends Logger {
+
+ public void setStatus(JobStepStatusEnum status);
+
+ public JobStepStatusEnum getStatus();
+
+ public void appendOutput(String message);
+
+ public String getOutput();
+
+ public void setExitCode(int exitCode);
+
+ public int getExitCode();
+
+ public void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
new file mode 100644
index 0000000..5a47173
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.job.exception.JobException;
+
+/**
+ * @author xjiang
+ *
+ */
+public interface IJobCommand {
+
+ public ICommandOutput execute() throws JobException;
+
+ public void cancel() throws JobException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
new file mode 100644
index 0000000..6a718fc
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ * @author xjiang
+ *
+ */
+public class ShellCmd implements IJobCommand {
+
+ private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
+
+ private final String executeCommand;
+ private final ICommandOutput output;
+ private final boolean isAsync;
+ private final CliCommandExecutor cliCommandExecutor;
+
+ private FutureTask<Integer> future;
+
+ private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
+ this.executeCommand = executeCmd;
+ this.output = out;
+ this.cliCommandExecutor = new CliCommandExecutor();
+ this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
+ this.isAsync = async;
+ }
+
+ public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
+ this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
+ }
+
+ @Override
+ public ICommandOutput execute() throws JobException {
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ future = new FutureTask<Integer>(new Callable<Integer>() {
+ public Integer call() throws JobException, IOException {
+ executor.shutdown();
+ return executeCommand(executeCommand);
+ }
+ });
+ executor.execute(future);
+
+ int exitCode = -1;
+ if (!isAsync) {
+ try {
+ exitCode = future.get();
+ log.info("finish executing");
+ } catch (CancellationException e) {
+ log.debug("Command is cancelled");
+ exitCode = -2;
+ } catch (Exception e) {
+ throw new JobException("Error when execute job " + executeCommand, e);
+ } finally {
+ if (exitCode == 0) {
+ output.setStatus(JobStepStatusEnum.FINISHED);
+ } else if (exitCode == -2) {
+ output.setStatus(JobStepStatusEnum.DISCARDED);
+ } else {
+ output.setStatus(JobStepStatusEnum.ERROR);
+ }
+ output.setExitCode(exitCode);
+ }
+ }
+ return output;
+ }
+
+ protected int executeCommand(String command) throws JobException, IOException {
+ output.reset();
+ output.setStatus(JobStepStatusEnum.RUNNING);
+ return cliCommandExecutor.execute(command, output).getFirst();
+ }
+
+ @Override
+ public void cancel() throws JobException {
+ future.cancel(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
new file mode 100644
index 0000000..ebcad47
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ *
+ */
+public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
+
+ protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
+
+ protected StringBuilder output;
+ protected int exitCode;
+ protected JobStepStatusEnum status;
+
+ public ShellCmdOutput() {
+ init();
+ }
+
+ private void init() {
+ output = new StringBuilder();
+ exitCode = -1;
+ status = JobStepStatusEnum.NEW;
+ }
+
+ @Override
+ public JobStepStatusEnum getStatus() {
+ return status;
+ }
+
+ @Override
+ public void setStatus(JobStepStatusEnum s) {
+ this.status = s;
+ }
+
+ @Override
+ public String getOutput() {
+ return output.toString();
+ }
+
+ @Override
+ public void appendOutput(String message) {
+ output.append(message).append(System.getProperty("line.separator"));
+ log.debug(message);
+ }
+
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ @Override
+ public void setExitCode(int code) {
+ exitCode = code;
+ }
+
+ @Override
+ public void reset() {
+ init();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
new file mode 100644
index 0000000..1cda348
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/**
+ */
+public class OptionsHelper {
+ private CommandLine commandLine;
+
+ public void parseOptions(Options options, String[] args) throws ParseException {
+ CommandLineParser parser = new GnuParser();
+ commandLine = parser.parse(options, args);
+ }
+
+ public Option[] getOptions() {
+ return commandLine.getOptions();
+ }
+
+ public String getOptionsAsString() {
+ StringBuilder buf = new StringBuilder();
+ for (Option option : commandLine.getOptions()) {
+ buf.append(" ");
+ buf.append(option.getOpt());
+ if (option.hasArg()) {
+ buf.append("=");
+ buf.append(option.getValue());
+ }
+ }
+ return buf.toString();
+ }
+
+ public String getOptionValue(Option option) {
+ return commandLine.getOptionValue(option.getOpt());
+ }
+
+ public boolean hasOption(Option option) {
+ return commandLine.hasOption(option.getOpt());
+ }
+
+ public void printUsage(String programName, Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(programName, options);
+ }
+
+ public static String convertToFileURL(String path) {
+ if (File.separatorChar != '/') {
+ path = path.replace(File.separatorChar, '/');
+ }
+
+ return path;
+ }
+
+}