You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:44 UTC
[42/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
new file mode 100644
index 0000000..0b812a6
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
+
+ private static final int INPUT_ROWS = 10000;
+ private static final int SPLIT_ROWS = 5000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ long randSeed = System.currentTimeMillis();
+
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ doggedBuilder.setConcurrentThreads(THREADS);
+ doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+ FileRecordWriter doggedResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+ InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ doggedResult.close();
+ }
+
+ InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ inmemBuilder.setConcurrentThreads(THREADS);
+ FileRecordWriter inmemResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+ InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ inmemResult.close();
+ }
+
+ fileCompare(doggedResult.file, inmemResult.file);
+ doggedResult.file.delete();
+ inmemResult.file.delete();
+ }
+
+ private void fileCompare(File file, File file2) throws IOException {
+ BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+ BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+ String line1, line2;
+ do {
+ line1 = r1.readLine();
+ line2 = r2.readLine();
+
+ assertEquals(line1, line2);
+
+ } while (line1 != null || line2 != null);
+
+ r1.close();
+ r2.close();
+ }
+
+ class FileRecordWriter implements ICuboidWriter {
+
+ File file;
+ PrintWriter writer;
+
+ FileRecordWriter() throws IOException {
+ file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+ writer = new PrintWriter(file, "UTF-8");
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ writer.print(cuboidId);
+ writer.print(", ");
+ writer.print(record.toString());
+ writer.println();
+ }
+
+ public void close() {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
new file mode 100644
index 0000000..1487dff
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
+
+ private static final int INPUT_ROWS = 70000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ cubeBuilder.setConcurrentThreads(THREADS);
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ try {
+ // round 1
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, INPUT_ROWS);
+ future.get();
+ }
+
+ // round 2, zero input
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, 0);
+ future.get();
+ }
+
+ // round 3
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, INPUT_ROWS);
+ future.get();
+ }
+
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ throw new IOException("Failed to build cube ", e);
+ }
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+ feedData(cube, flatTable, queue, count, 0);
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ @SuppressWarnings("unchecked")
+ Set<String>[] distinctSets = new Set[nColumns];
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i] = new TreeSet<String>();
+
+ // get distinct values on each column
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ assert row.length == nColumns;
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i].add(row[i]);
+ }
+
+ List<String[]> distincts = new ArrayList<String[]>();
+ for (int i = 0; i < nColumns; i++) {
+ distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+ }
+
+ Random rand = new Random();
+ if (randSeed != 0)
+ rand.setSeed(randSeed);
+
+ // output with random data
+ for (; count > 0; count--) {
+ ArrayList<String> row = new ArrayList<String>(nColumns);
+ for (int i = 0; i < nColumns; i++) {
+ String[] candidates = distincts.get(i);
+ row.add(candidates[rand.nextInt(candidates.length)]);
+ }
+ queue.put(row);
+ }
+ queue.put(new ArrayList<String>(0));
+ }
+
+ static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+ Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+ CubeDesc desc = cube.getDescriptor();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+ for (int c = 0; c < columns.size(); c++) {
+ TblColRef col = columns.get(c);
+ if (desc.getRowkey().isUseDictionary(col)) {
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
+ result.put(col, dict);
+ }
+ }
+ return result;
+ }
+
+ private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+ List<byte[]> result = Lists.newArrayList();
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ assert row.length == nColumns;
+ if (row[c] != null) {
+ result.add(Bytes.toBytes(row[c]));
+ }
+ }
+ return result;
+ }
+
+ class ConsoleGTRecordWriter implements ICuboidWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
new file mode 100644
index 0000000..645908d
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.cube.inmemcubing.MemDiskStore;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class MemDiskStoreTest {
+
+ final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+ final GTInfo info = UnitTestSupport.advancedInfo();
+ final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadWriteRead() throws IOException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead();
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+
+ int nThreads = 5;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ verifyOneTableWriteAndRead();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead() throws IOException {
+ MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table);
+ }
+
+ private void verifyWriteAndRead(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ IGTScanner scanner = table.scan(new GTScanRequest(info));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..f7bf432
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class MemoryBudgetControllerTest {
+
+ @Test
+ public void test() {
+ final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+ final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+ ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+ for (int i = 0; i < n; i++) {
+ mbList.add(new Consumer(mbc));
+ assertEquals(mbList.size(), mbc.getTotalReservedMB());
+ }
+
+ // a's reservation will free up all the previous
+ final Consumer a = new Consumer();
+ mbc.reserve(a, n);
+ for (int i = 0; i < n; i++) {
+ assertEquals(null, mbList.get(i).data);
+ }
+
+ // cancel a in 2 seconds
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ mbc.reserve(a, 0);
+ }
+ }.start();
+
+ // b will success after some wait
+ long bWaitStart = System.currentTimeMillis();
+ final Consumer b = new Consumer();
+ mbc.reserveInsist(b, n);
+ assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+ try {
+ mbc.reserve(a, 1);
+ fail();
+ } catch (NotEnoughBudgetException ex) {
+ // expected
+ }
+ }
+
+ class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+ byte[] data;
+
+ Consumer() {
+ }
+
+ Consumer(MemoryBudgetController mbc) {
+ mbc.reserve(this, 1);
+ data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ if (data != null) {
+ data = null;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
new file mode 100644
index 0000000..f53f11f
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.math.BigDecimal;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
+import org.apache.kylin.metadata.measure.DoubleSumAggregator;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LongSumAggregator;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.junit.Test;
+
+public class AggregationCacheMemSizeTest {
+
+ public static final int NUM_OF_OBJS = 1000000 / 2;
+
+ interface CreateAnObject {
+ Object create();
+ }
+
+ @Test
+ public void testHLLCAggregatorSize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ HLLCAggregator aggr = new HLLCAggregator(10);
+ aggr.aggregate(new HyperLogLogPlusCounter(10));
+ return aggr;
+ }
+ });
+ System.out.println("HLLC: " + est);
+ }
+
+ @Test
+ public void testBigDecimalAggregatorSize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ return newBigDecimalAggr();
+ }
+
+ });
+ System.out.println("BigDecimal: " + est);
+ }
+
+ private BigDecimalSumAggregator newBigDecimalAggr() {
+ BigDecimalSumAggregator aggr = new BigDecimalSumAggregator();
+ aggr.aggregate(new BigDecimal("12345678901234567890.123456789"));
+ return aggr;
+ }
+
+ @Test
+ public void testLongAggregatorSize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ return newLongAggr();
+ }
+ });
+ System.out.println("Long: " + est);
+ }
+
+ private LongSumAggregator newLongAggr() {
+ LongSumAggregator aggr = new LongSumAggregator();
+ aggr.aggregate(new LongMutable(10));
+ return aggr;
+ }
+
+ @Test
+ public void testDoubleAggregatorSize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ return newDoubleAggr();
+ }
+ });
+ System.out.println("Double: " + est);
+ }
+
+ private DoubleSumAggregator newDoubleAggr() {
+ DoubleSumAggregator aggr = new DoubleSumAggregator();
+ aggr.aggregate(new DoubleMutable(10));
+ return aggr;
+ }
+
+ @Test
+ public void testByteArraySize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ return new byte[10];
+ }
+ });
+ System.out.println("byte[10]: " + est);
+ }
+
+ @Test
+ public void testAggregatorArraySize() throws InterruptedException {
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ return new MeasureAggregator[7];
+ }
+ });
+ System.out.println("MeasureAggregator[7]: " + est);
+ }
+
+ @Test
+ public void testTreeMapSize() throws InterruptedException {
+ final SortedMap<byte[], Object> map = new TreeMap<byte[], Object>(new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return Bytes.compareTo(o1, o2);
+ }
+ });
+ final Random rand = new Random();
+ int est = estimateObjectSize(new CreateAnObject() {
+ @Override
+ public Object create() {
+ byte[] key = new byte[10];
+ rand.nextBytes(key);
+ map.put(key, null);
+ return null;
+ }
+ });
+ System.out.println("TreeMap entry: " + (est - 20)); // -20 is to exclude byte[10]
+ }
+
+ @Test
+ public void testAggregationCacheSize() throws InterruptedException {
+ final SortedMap<byte[], Object> map = new TreeMap<byte[], Object>(new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return Bytes.compareTo(o1, o2);
+ }
+ });
+ final Random rand = new Random();
+
+ long bytesBefore = memLeft();
+ byte[] key = null;
+ MeasureAggregator<?>[] aggrs = null;
+ for (int i = 0; i < NUM_OF_OBJS; i++) {
+ key = new byte[10];
+ rand.nextBytes(key);
+ aggrs = new MeasureAggregator[4];
+ aggrs[0] = newBigDecimalAggr();
+ aggrs[1] = newLongAggr();
+ aggrs[2] = newDoubleAggr();
+ aggrs[3] = newDoubleAggr();
+ map.put(key, aggrs);
+ }
+
+ long bytesAfter = memLeft();
+
+ long mapActualSize = bytesBefore - bytesAfter;
+ long mapExpectSize = GTAggregateScanner.estimateSizeOfAggrCache(key, aggrs, map.size());
+ System.out.println("Actual cache size: " + mapActualSize);
+ System.out.println("Expect cache size: " + mapExpectSize);
+ }
+
+ private int estimateObjectSize(CreateAnObject factory) throws InterruptedException {
+ Object[] hold = new Object[NUM_OF_OBJS];
+ long bytesBefore = memLeft();
+
+ for (int i = 0; i < hold.length; i++) {
+ hold[i] = factory.create();
+ }
+
+ long bytesAfter = memLeft();
+ return (int) ((bytesBefore - bytesAfter) / hold.length);
+ }
+
+ private long memLeft() throws InterruptedException {
+ Runtime.getRuntime().gc();
+ Thread.sleep(500);
+ return getSystemAvailBytes();
+ }
+
+ private long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
new file mode 100644
index 0000000..c90c5d3
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DictGridTableTest {
+
+ @Test
+ public void test() throws IOException {
+ GridTable table = newTestTable();
+ verifyScanRangePlanner(table);
+ verifyFirstRow(table);
+ verifyScanWithUnevaluatableFilter(table);
+ verifyScanWithEvaluatableFilter(table);
+ verifyConvertFilterConstants1(table);
+ verifyConvertFilterConstants2(table);
+ verifyConvertFilterConstants3(table);
+ verifyConvertFilterConstants4(table);
+ }
+
+ private void verifyScanRangePlanner(GridTable table) {
+ GTInfo info = table.getInfo();
+ GTScanRangePlanner planner = new GTScanRangePlanner(info);
+
+ CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+ CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+ CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+ CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+ CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+ CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+ CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+ // flatten or-and & hbase fuzzy value
+ {
+ LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(1, r.size());
+ assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
+ assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+ }
+
+ // pre-evaluate ever false
+ {
+ LogicalTupleFilter filter = and(timeComp1, timeComp2);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(0, r.size());
+ }
+
+ // pre-evaluate ever true
+ {
+ LogicalTupleFilter filter = or(timeComp1, ageComp4);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge overlap range
+ {
+ LogicalTupleFilter filter = or(timeComp1, timeComp3);
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge too many ranges
+ {
+ LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
+ List<GTScanRange> r = planner.planScanRanges(filter);
+ assertEquals(3, r.size());
+ assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+ assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+ assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+ List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+ assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+ }
+ }
+
+ private void verifyFirstRow(GridTable table) throws IOException {
+ doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
+ }
+
+ private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+ LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+ LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+ GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+ // note the unEvaluatable column 1 in filter is added to group by
+ assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+ doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
+ }
+
+ private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+ // note the evaluatable column 1 in filter is added to returned columns but not in group by
+ assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+
+ doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
+ }
+
+ private void verifyConvertFilterConstants1(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants2(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1<"9" round up to $1<"10"
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants3(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1<="9" round down to FALSE
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
+ }
+
+ private void verifyConvertFilterConstants4(GridTable table) {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+ TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1 in ("9", "10", "15") has only "10" left
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
+ }
+
+ private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+ System.out.println(req);
+ IGTScanner scanner = table.scan(req);
+ int i = 0;
+ for (GTRecord r : scanner) {
+ System.out.println(r);
+ if (verifyRows != null && i < verifyRows.length) {
+ assertEquals(verifyRows[i], r.toString());
+ }
+ i++;
+ }
+ scanner.close();
+ }
+
+ private Object enc(GTInfo info, int col, String value) {
+ ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+ info.codeSystem.encodeColumnValue(col, value, buf);
+ return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+ }
+
+ private ExtractTupleFilter unevaluatable(TblColRef col) {
+ ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+ r.addChild(new ColumnTupleFilter(col));
+ return r;
+ }
+
+ private CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+ CompareTupleFilter result = new CompareTupleFilter(op);
+ result.addChild(new ColumnTupleFilter(col));
+ result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+ return result;
+ }
+
+ private LogicalTupleFilter and(TupleFilter... children) {
+ return logic(FilterOperatorEnum.AND, children);
+ }
+
+ private LogicalTupleFilter or(TupleFilter... children) {
+ return logic(FilterOperatorEnum.OR, children);
+ }
+
+ private LogicalTupleFilter not(TupleFilter child) {
+ return logic(FilterOperatorEnum.NOT, child);
+ }
+
+ private LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+ LogicalTupleFilter result = new LogicalTupleFilter(op);
+ for (TupleFilter c : children) {
+ result.addChild(c);
+ }
+ return result;
+ }
+
+ static GridTable newTestTable() throws IOException {
+ GTInfo info = newInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTRecord r = new GTRecord(table.getInfo());
+ GTBuilder builder = table.rebuild();
+
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+ builder.close();
+
+ return table;
+ }
+
+ static GTInfo newInfo() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(newDictCodeSystem());
+ builder.setColumns( //
+ DataType.getInstance("timestamp"), //
+ DataType.getInstance("integer"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("bigint"), //
+ DataType.getInstance("decimal") //
+ );
+ builder.setPrimaryKey(setOf(0, 1));
+ builder.setColumnPreferIndex(setOf(0));
+ builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+ builder.enableRowBlock(4);
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static CubeCodeSystem newDictCodeSystem() {
+ Map<Integer, Dictionary> dictionaryMap = Maps.newHashMap();
+ dictionaryMap.put(1, newDictionaryOfInteger());
+ dictionaryMap.put(2, newDictionaryOfString());
+ return new CubeCodeSystem(dictionaryMap);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Dictionary newDictionaryOfString() {
+ TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+ builder.addValue("Dong");
+ builder.addValue("George");
+ builder.addValue("Jason");
+ builder.addValue("Kejia");
+ builder.addValue("Luke");
+ builder.addValue("Mahone");
+ builder.addValue("Qianhao");
+ builder.addValue("Shaofeng");
+ builder.addValue("Xu");
+ builder.addValue("Yang");
+ return builder.build(0);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Dictionary newDictionaryOfInteger() {
+ NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
+ builder.addValue("10");
+ builder.addValue("20");
+ builder.addValue("30");
+ builder.addValue("40");
+ builder.addValue("50");
+ builder.addValue("60");
+ builder.addValue("70");
+ builder.addValue("80");
+ builder.addValue("90");
+ builder.addValue("100");
+ return builder.build(0);
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
new file mode 100644
index 0000000..c87e970
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.junit.Test;
+
+public class SimpleGridTableTest {
+
+ @Test
+ public void testBasics() throws IOException {
+ GTInfo info = UnitTestSupport.basicInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAdvanced() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAggregate() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTBuilder builder = rebuild(table);
+ IGTScanner scanner = scanAndAggregate(table);
+ assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+ assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+ }
+
+ @Test
+ public void testAppend() throws IOException {
+ GTInfo info = UnitTestSupport.advancedInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ rebuildViaAppend(table);
+ IGTScanner scanner = scan(table);
+ assertEquals(3, scanner.getScannedRowBlockCount());
+ assertEquals(10, scanner.getScannedRowCount());
+ }
+
+ private IGTScanner scan(GridTable table) throws IOException {
+ GTScanRequest req = new GTScanRequest(table.getInfo());
+ IGTScanner scanner = table.scan(req);
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ assertTrue(((String) v[0]).startsWith("2015-"));
+ assertTrue(((String) v[2]).equals("Food"));
+ assertTrue(((LongMutable) v[3]).get() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ System.out.println(r);
+ }
+ scanner.close();
+ System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+ System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+ return scanner;
+ }
+
+ private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+ GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+ IGTScanner scanner = table.scan(req);
+ int i = 0;
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ switch (i) {
+ case 0:
+ assertTrue(((LongMutable) v[3]).get() == 20);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+ break;
+ case 1:
+ assertTrue(((LongMutable) v[3]).get() == 30);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+ break;
+ case 2:
+ assertTrue(((LongMutable) v[3]).get() == 40);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+ break;
+ case 3:
+ assertTrue(((LongMutable) v[3]).get() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ break;
+ default:
+ fail();
+ }
+ i++;
+ System.out.println(r);
+ }
+ scanner.close();
+ System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+ System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+ return scanner;
+ }
+
+ static GTBuilder rebuild(GridTable table) throws IOException {
+ GTBuilder builder = table.rebuild();
+ for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
+ builder.write(rec);
+ }
+ builder.close();
+
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+ return builder;
+ }
+
+ static void rebuildViaAppend(GridTable table) throws IOException {
+ List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
+ GTBuilder builder;
+ int i = 0;
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+ builder = table.append();
+ builder.write(data.get(i++));
+ builder.close();
+ System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+ System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+ }
+
+ private static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
new file mode 100644
index 0000000..f5c9645
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInvertedIndex;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.serializer.StringSerializer;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SimpleInvertedIndexTest {
+
+ GTInfo info;
+ GTInvertedIndex index;
+ ArrayList<CompareTupleFilter> basicFilters = Lists.newArrayList();
+ ArrayList<ConciseSet> basicResults = Lists.newArrayList();
+
+ public SimpleInvertedIndexTest() {
+
+ info = UnitTestSupport.advancedInfo();
+ TblColRef colA = info.colRef(0);
+
+ // block i contains value "i", the last is NULL
+ index = new GTInvertedIndex(info);
+ GTRowBlock mockBlock = GTRowBlock.allocate(info);
+ GTRowBlock.Writer writer = mockBlock.getWriter();
+ GTRecord record = new GTRecord(info);
+ for (int i = 0; i < 10; i++) {
+ record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0));
+ for (int j = 0; j < info.getRowBlockSize(); j++) {
+ writer.append(record);
+ }
+ writer.readyForFlush();
+ index.add(mockBlock);
+
+ writer.clearForNext();
+ }
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL));
+ basicResults.add(set(9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0));
+ basicResults.add(set(0));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5));
+ basicResults.add(set(0, 5));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5));
+ basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3));
+ basicResults.add(set(0, 1, 2));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3));
+ basicResults.add(set(0, 1, 2, 3));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3));
+ basicResults.add(set(4, 5, 6, 7, 8));
+
+ basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3));
+ basicResults.add(set(3, 4, 5, 6, 7, 8));
+ }
+
+ @Test
+ public void testBasics() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ assertEquals(basicResults.get(i), index.filter(basicFilters.get(i)));
+ }
+ }
+
+ @Test
+ public void testLogicalAnd() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ for (int j = 0; j < basicFilters.size(); j++) {
+ LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j));
+ ConciseSet r = basicResults.get(i).clone();
+ r.retainAll(basicResults.get(j));
+ assertEquals(r, index.filter(f));
+ }
+ }
+ }
+
+ @Test
+ public void testLogicalOr() {
+ for (int i = 0; i < basicFilters.size(); i++) {
+ for (int j = 0; j < basicFilters.size(); j++) {
+ LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j));
+ ConciseSet r = basicResults.get(i).clone();
+ r.addAll(basicResults.get(j));
+ assertEquals(r, index.filter(f));
+ }
+ }
+ }
+
+ @Test
+ public void testNotEvaluable() {
+ ConciseSet all = set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ CompareTupleFilter notEvaluable = compare(info.colRef(1), FilterOperatorEnum.EQ, 0);
+ assertEquals(all, index.filter(notEvaluable));
+
+ LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(0), notEvaluable);
+ assertEquals(all, index.filter(or));
+
+ LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(0), notEvaluable);
+ assertEquals(basicResults.get(0), index.filter(and));
+ }
+
+ public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) {
+ CompareTupleFilter filter = new CompareTupleFilter(op);
+ filter.addChild(columnFilter(col));
+ for (int i : ids) {
+ filter.addChild(constFilter(i));
+ }
+ return filter;
+ }
+
+ public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) {
+ LogicalTupleFilter filter = new LogicalTupleFilter(op);
+ for (TupleFilter f : filters)
+ filter.addChild(f);
+ return filter;
+ }
+
+ public static ColumnTupleFilter columnFilter(TblColRef col) {
+ return new ColumnTupleFilter(col);
+ }
+
+ public static ConstantTupleFilter constFilter(int id) {
+ byte[] space = new byte[10];
+ ByteBuffer buf = ByteBuffer.wrap(space);
+ StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
+ stringSerializer.serialize("" + id, buf);
+ ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
+ return new ConstantTupleFilter(data);
+ }
+
+ public static ConciseSet set(int... ints) {
+ ConciseSet set = new ConciseSet();
+ for (int i : ints)
+ set.add(i);
+ return set;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/pom.xml
----------------------------------------------------------------------
diff --git a/core-job/pom.xml b/core-job/pom.xml
index b51c1cf..b619f43 100644
--- a/core-job/pom.xml
+++ b/core-job/pom.xml
@@ -41,8 +41,20 @@
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
new file mode 100644
index 0000000..6b814ef
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class BuildEngineFactory {
+
+ private static IBatchCubingEngine defaultBatchEngine;
+
+ public static IBatchCubingEngine defaultBatchEngine() {
+ if (defaultBatchEngine == null) {
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ if (conf.isCubingInMem()) {
+ defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+ } else {
+ defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
+ }
+ }
+ return defaultBatchEngine;
+ }
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
+ }
+
+ /** Merge multiple small segments into a big one. */
+ public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
new file mode 100644
index 0000000..904f557
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IBatchCubingEngine {
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+
+ /** Merge multiple small segments into a big one. */
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+
+ public Class<?> getSourceInterface();
+
+ public Class<?> getStorageInterface();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
new file mode 100644
index 0000000..0359ce9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+
+public interface IStreamingCubingEngine {
+
+ public Runnable createStreamingCubingBuilder(CubeSegment seg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
new file mode 100644
index 0000000..e7f5772
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
+
+ public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
+
+ public static final String YARN_APP_ID = "yarn_application_id";
+ public static final String YARN_APP_URL = "yarn_application_tracking_url";
+ public static final String MR_JOB_ID = "mr_job_id";
+ public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+ public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+ public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+
+ public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
+ return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
+ }
+
+ public static String getJobIdentity(JobInstance jobInstance) {
+ return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
+ }
+
+ public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
+ return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
+ }
+
+ public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
+ if (jobUuid == null || jobUuid.equals("")) {
+ throw new IllegalArgumentException("jobUuid can't be null or empty");
+ }
+ return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
+ }
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("type")
+ private CubeBuildTypeEnum type; // java implementation
+ @JsonProperty("duration")
+ private long duration;
+ @JsonProperty("related_cube")
+ private String relatedCube;
+ @JsonProperty("related_segment")
+ private String relatedSegment;
+ @JsonProperty("exec_start_time")
+ private long execStartTime;
+ @JsonProperty("exec_end_time")
+ private long execEndTime;
+ @JsonProperty("mr_waiting")
+ private long mrWaiting = 0;
+ @JsonManagedReference
+ @JsonProperty("steps")
+ private List<JobStep> steps;
+ @JsonProperty("submitter")
+ private String submitter;
+ @JsonProperty("job_status")
+ private JobStatusEnum status;
+
+ public JobStep getRunningStep() {
+ for (JobStep step : this.getSteps()) {
+ if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
+ return step;
+ }
+ }
+
+ return null;
+ }
+
+ @JsonProperty("progress")
+ public double getProgress() {
+ int completedStepCount = 0;
+ for (JobStep step : this.getSteps()) {
+ if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
+ completedStepCount++;
+ }
+ }
+
+ return 100.0 * completedStepCount / steps.size();
+ }
+
+ public JobStatusEnum getStatus() {
+ return this.status;
+ }
+
+ public void setStatus(JobStatusEnum status) {
+ this.status = status;
+ }
+
+// @JsonProperty("job_status")
+// public JobStatusEnum getStatus() {
+//
+// // JobStatusEnum finalJobStatus;
+// int compositResult = 0;
+//
+// // if steps status are all NEW, then job status is NEW
+// // if steps status are all FINISHED, then job status is FINISHED
+// // if steps status are all PENDING, then job status is PENDING
+// // if steps status are FINISHED and PENDING, the job status is PENDING
+// // if one of steps status is RUNNING, then job status is RUNNING
+// // if one of steps status is ERROR, then job status is ERROR
+// // if one of steps status is KILLED, then job status is KILLED
+// // default status is RUNNING
+//
+// System.out.println(this.getName());
+//
+// for (JobStep step : this.getSteps()) {
+// //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
+// compositResult = compositResult | step.getStatus().getCode();
+// }
+//
+// System.out.println();
+//
+// if (compositResult == JobStatusEnum.FINISHED.getCode()) {
+// return JobStatusEnum.FINISHED;
+// } else if (compositResult == JobStatusEnum.NEW.getCode()) {
+// return JobStatusEnum.NEW;
+// } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
+// return JobStatusEnum.PENDING;
+// } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
+// return JobStatusEnum.PENDING;
+// } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
+// return JobStatusEnum.ERROR;
+// } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
+// return JobStatusEnum.DISCARDED;
+// } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
+// return JobStatusEnum.RUNNING;
+// }
+//
+// return JobStatusEnum.RUNNING;
+// }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public CubeBuildTypeEnum getType() {
+ return type;
+ }
+
+ public void setType(CubeBuildTypeEnum type) {
+ this.type = type;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public String getRelatedCube() {
+ return relatedCube;
+ }
+
+ public void setRelatedCube(String relatedCube) {
+ this.relatedCube = relatedCube;
+ }
+
+ public String getRelatedSegment() {
+ return relatedSegment;
+ }
+
+ public void setRelatedSegment(String relatedSegment) {
+ this.relatedSegment = relatedSegment;
+ }
+
+ /**
+ * @return the execStartTime
+ */
+ public long getExecStartTime() {
+ return execStartTime;
+ }
+
+ /**
+ * @param execStartTime the execStartTime to set
+ */
+ public void setExecStartTime(long execStartTime) {
+ this.execStartTime = execStartTime;
+ }
+
+ /**
+ * @return the execEndTime
+ */
+ public long getExecEndTime() {
+ return execEndTime;
+ }
+
+ /**
+ * @param execEndTime the execEndTime to set
+ */
+ public void setExecEndTime(long execEndTime) {
+ this.execEndTime = execEndTime;
+ }
+
+ public long getMrWaiting() {
+ return this.mrWaiting;
+ }
+
+ public void setMrWaiting(long mrWaiting) {
+ this.mrWaiting = mrWaiting;
+ }
+
+ public List<JobStep> getSteps() {
+ if (steps == null) {
+ steps = Lists.newArrayList();
+ }
+ return steps;
+ }
+
+ public void clearSteps() {
+ getSteps().clear();
+ }
+
+ public void addSteps(Collection<JobStep> steps) {
+ this.getSteps().addAll(steps);
+ }
+
+ public void addStep(JobStep step) {
+ getSteps().add(step);
+ }
+
+ public void addStep(int index, JobStep step) {
+ getSteps().add(index, step);
+ }
+
+ public JobStep findStep(String stepName) {
+ for (JobStep step : getSteps()) {
+ if (stepName.equals(step.getName())) {
+ return step;
+ }
+ }
+ return null;
+ }
+
+
+ public String getSubmitter() {
+ return submitter;
+ }
+
+ public void setSubmitter(String submitter) {
+ this.submitter = submitter;
+ }
+
+
+
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class JobStep implements Comparable<JobStep> {
+
+ @JsonBackReference
+ private JobInstance jobInstance;
+
+ @JsonProperty("id")
+ private String id;
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("sequence_id")
+ private int sequenceID;
+
+ @JsonProperty("exec_cmd")
+ private String execCmd;
+
+ @JsonProperty("interrupt_cmd")
+ private String InterruptCmd;
+
+ @JsonProperty("exec_start_time")
+ private long execStartTime;
+ @JsonProperty("exec_end_time")
+ private long execEndTime;
+ @JsonProperty("exec_wait_time")
+ private long execWaitTime;
+
+ @JsonProperty("step_status")
+ private JobStepStatusEnum status;
+
+ @JsonProperty("cmd_type")
+ private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
+
+ @JsonProperty("info")
+ private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
+
+ @JsonProperty("run_async")
+ private boolean runAsync = false;
+
+ private ConcurrentHashMap<String, String> getInfo() {
+ return info;
+ }
+
+ public void putInfo(String key, String value) {
+ getInfo().put(key, value);
+ }
+
+ public String getInfo(String key) {
+ return getInfo().get(key);
+ }
+
+ public void clearInfo() {
+ getInfo().clear();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getSequenceID() {
+ return sequenceID;
+ }
+
+ public void setSequenceID(int sequenceID) {
+ this.sequenceID = sequenceID;
+ }
+
+ public String getExecCmd() {
+ return execCmd;
+ }
+
+ public void setExecCmd(String execCmd) {
+ this.execCmd = execCmd;
+ }
+
+ public JobStepStatusEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(JobStepStatusEnum status) {
+ this.status = status;
+ }
+
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the execStartTime
+ */
+ public long getExecStartTime() {
+ return execStartTime;
+ }
+
+ /**
+ * @param execStartTime the execStartTime to set
+ */
+ public void setExecStartTime(long execStartTime) {
+ this.execStartTime = execStartTime;
+ }
+
+ /**
+ * @return the execEndTime
+ */
+ public long getExecEndTime() {
+ return execEndTime;
+ }
+
+ /**
+ * @param execEndTime the execEndTime to set
+ */
+ public void setExecEndTime(long execEndTime) {
+ this.execEndTime = execEndTime;
+ }
+
+ public long getExecWaitTime() {
+ return execWaitTime;
+ }
+
+ public void setExecWaitTime(long execWaitTime) {
+ this.execWaitTime = execWaitTime;
+ }
+
+ public String getInterruptCmd() {
+ return InterruptCmd;
+ }
+
+ public void setInterruptCmd(String interruptCmd) {
+ InterruptCmd = interruptCmd;
+ }
+
+ public JobStepCmdTypeEnum getCmdType() {
+ return cmdType;
+ }
+
+ public void setCmdType(JobStepCmdTypeEnum cmdType) {
+ this.cmdType = cmdType;
+ }
+
+ /**
+ * @return the runAsync
+ */
+ public boolean isRunAsync() {
+ return runAsync;
+ }
+
+ /**
+ * @param runAsync the runAsync to set
+ */
+ public void setRunAsync(boolean runAsync) {
+ this.runAsync = runAsync;
+ }
+
+ /**
+ * @return the jobInstance
+ */
+ public JobInstance getJobInstance() {
+ return jobInstance;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + sequenceID;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JobStep other = (JobStep) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (sequenceID != other.sequenceID)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(JobStep o) {
+ if (this.sequenceID < o.sequenceID) {
+ return -1;
+ } else if (this.sequenceID > o.sequenceID) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(JobInstance o) {
+ return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
+ }
+
+}