You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:24 UTC
[16/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
new file mode 100644
index 0000000..e3397c0
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+/**
+ */
+public class CubeSamplingTest {
+
+ private static final int ROW_LENGTH = 10;
+
+ private final List<String> row = new ArrayList<String>(ROW_LENGTH);
+ private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
+
+ private Integer[][] allCuboidsBitSet;
+ private HashFunction hf = null;
+ private long baseCuboidId;
+ private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+ private final byte[] seperator = Bytes.toBytes(",");
+
+ @Before
+ public void setup() {
+
+ baseCuboidId = (1l << ROW_LENGTH) - 1;
+ List<Long> allCuboids = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ for (long i = 1; i < baseCuboidId; i++) {
+ allCuboids.add(i);
+ addCuboidBitSet(i, allCuboidsBitSetList);
+ }
+
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+ System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+ allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
+ for (int i = 0; i < allCuboids.size(); i++) {
+ allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+ }
+
+ // hf = Hashing.goodFastHash(32);
+// hf = Hashing.md5();
+ hf = Hashing.murmur3_32();
+
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row_index[i] = new ByteArray();
+ }
+ }
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] indice = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+
+ }
+
+ @Test
+ public void test() {
+
+ long start = System.currentTimeMillis();
+ List<String> row;
+ for (int i = 0; i < 10000; i++) {
+ row = getRandomRow();
+ putRowKeyToHLL(row);
+ }
+
+ long duration = System.currentTimeMillis() - start;
+ System.out.println("The test takes " + duration / 1000 + "seconds.");
+ }
+
+ private void putRowKeyToHLL(List<String> row) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf.newHasher();
+ row_index[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
+ hc.putBytes(seperator);
+ }
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private List<String> getRandomRow() {
+ row.clear();
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row.add(RandomStringUtils.random(10));
+ }
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
new file mode 100644
index 0000000..ae75c61
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -0,0 +1,37 @@
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ */
+public class FactDistinctColumnsReducerTest {
+
+
+ @Test
+ public void testWriteCuboidStatistics() throws IOException {
+
+ final Configuration conf = HadoopUtil.getCurrentConfiguration();
+ final Path outputPath = new Path("file:///tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString());
+ if (!FileSystem.getLocal(conf).exists(outputPath)) {
+// FileSystem.getLocal(conf).create(outputPath);
+ }
+
+ System.out.println(outputPath);
+ Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+ FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
new file mode 100644
index 0000000..fad521f
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HiveToBaseCuboidMapperPerformanceTest {
+
+ String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
+ String cubeName = "test_kylin_cube_with_slr";
+ Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
+
+ @Ignore("convenient trial tool for dev")
+ @Test
+ public void test() throws IOException, InterruptedException {
+ Configuration hconf = new Configuration();
+ HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
+ Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
+
+ mapper.setup(context);
+
+ Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+ Text value = new Text();
+
+ while (reader.next(key, value)) {
+ mapper.map(key, value, context);
+ }
+
+ reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
new file mode 100644
index 0000000..45a2dcc
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+
+/**
+ * @author ysong1
+ */
+public class MergeCuboidJobTest extends LocalFileMetadataTestCase {
+
+ private Configuration conf;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.job.tracker", "local");
+
+ // for local runner out-of-memory issue
+ conf.set("mapreduce.task.io.sort.mb", "10");
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+ // String input =
+ // "src/test/resources/data/base_cuboid,src/test/resources/data/6d_cuboid";
+ String output = "target/test-output/merged_cuboid";
+ String cubeName = "test_kylin_cube_with_slr_ready";
+ String jobname = "merge_cuboid";
+
+ File baseFolder = File.createTempFile("kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa-", "base");
+ baseFolder.delete();
+ baseFolder.mkdir();
+ FileUtils.copyDirectory(new File("src/test/resources/data/base_cuboid"), baseFolder);
+ baseFolder.deleteOnExit();
+
+ File sixDFolder = File.createTempFile("kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa-", "6d");
+ sixDFolder.delete();
+ sixDFolder.mkdir();
+ FileUtils.copyDirectory(new File("src/test/resources/data/base_cuboid"), sixDFolder);
+ sixDFolder.deleteOnExit();
+
+ FileUtil.fullyDelete(new File(output));
+
+ // CubeManager cubeManager =
+ // CubeManager.getInstanceFromEnv(getTestConfig());
+
+ String[] args = { "-input", baseFolder.getAbsolutePath() + "," + sixDFolder.getAbsolutePath(), "-cubename", cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname };
+ assertEquals("Job failed", 0, ToolRunner.run(conf, new MergeCuboidJob(), args));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
new file mode 100644
index 0000000..a8d9670
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author honma
+ */
+@SuppressWarnings("rawtypes")
+public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeCuboidMapperTest.class);
+
+ MapDriver<Text, Text, Text, Text> mapDriver;
+ CubeManager cubeManager;
+ CubeInstance cube;
+ DictionaryManager dictionaryManager;
+
+ TblColRef lfn;
+ TblColRef lsi;
+ TblColRef ssc;
+
+ private DictionaryInfo makeSharedDict() throws IOException {
+ TableSignature signature = new TableSignature();
+ signature.setSize(100);
+ signature.setLastModifiedTime(System.currentTimeMillis());
+ signature.setPath("fake_common_dict");
+
+ DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature);
+
+ List<byte[]> values = new ArrayList<byte[]>();
+ values.add(new byte[] { 101, 101, 101 });
+ values.add(new byte[] { 102, 102, 102 });
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+ newDictInfo.setCardinality(dict.getSize());
+ dictionaryManager.trySaveNewDict(dict, newDictInfo);
+ ((TrieDictionary) dict).dump(System.out);
+
+ return newDictInfo;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ createTestMetadata();
+
+ logger.info("The metadataUrl is : " + getTestConfig());
+
+ MetadataManager.clearCache();
+ CubeManager.clearCache();
+ ProjectManager.clearCache();
+ DictionaryManager.clearCache();
+
+ // hack for distributed cache
+ // CubeManager.removeInstance(KylinConfig.createInstanceFromUri("../job/meta"));//to
+ // make sure the following mapper could get latest CubeManger
+ FileUtils.deleteDirectory(new File("../job/meta"));
+
+ MergeCuboidMapper mapper = new MergeCuboidMapper();
+ mapDriver = MapDriver.newMapDriver(mapper);
+
+ cubeManager = CubeManager.getInstance(getTestConfig());
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+ dictionaryManager = DictionaryManager.getInstance(getTestConfig());
+ lfn = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
+ lsi = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");
+ ssc = cube.getDescriptor().findColumnRef("DEFAULT.TEST_CATEGORY_GROUPINGS", "META_CATEG_NAME");
+
+ DictionaryInfo sharedDict = makeSharedDict();
+
+ boolean isFirstSegment = true;
+ for (CubeSegment segment : cube.getSegments()) {
+
+ TableSignature signature = new TableSignature();
+ signature.setSize(100);
+ signature.setLastModifiedTime(System.currentTimeMillis());
+ signature.setPath("fake_dict_for" + lfn.getName() + segment.getName());
+
+ DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature);
+
+ List<byte[]> values = new ArrayList<byte[]>();
+ values.add(new byte[] { 97, 97, 97 });
+ if (isFirstSegment)
+ values.add(new byte[] { 99, 99, 99 });
+ else
+ values.add(new byte[] { 98, 98, 98 });
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+ newDictInfo.setCardinality(dict.getSize());
+ dictionaryManager.trySaveNewDict(dict, newDictInfo);
+ ((TrieDictionary) dict).dump(System.out);
+
+ segment.putDictResPath(lfn, newDictInfo.getResourcePath());
+ segment.putDictResPath(lsi, sharedDict.getResourcePath());
+ segment.putDictResPath(ssc, sharedDict.getResourcePath());
+
+ // cubeManager.saveResource(segment.getCubeInstance());
+ // cubeManager.afterCubeUpdated(segment.getCubeInstance());
+
+ isFirstSegment = false;
+ }
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ cube = cubeManager.updateCube(cubeBuilder);
+
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ }
+
+ @Test
+ public void test() throws IOException, ParseException {
+
+ // String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
+
+ CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, 1386835200000L, false);
+ // String segmentName = newSeg.getName();
+
+ final Dictionary<?> dictionary = cubeManager.getDictionary(newSeg, lfn);
+ assertTrue(dictionary == null);
+ // ((TrieDictionary) dictionary).dump(System.out);
+
+ // hack for distributed cache
+ // File metaDir = new File("../job/meta");
+ // FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
+ //
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ // // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
+ // // "../job/meta");
+ //
+ // byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
+ // byte[] value = new byte[] { 1, 2, 3 };
+ // byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
+ // byte[] newvalue = new byte[] { 1, 2, 3 };
+ //
+ // mapDriver.withInput(new Text(key), new Text(value));
+ // mapDriver.withOutput(new Text(newkey), new Text(newvalue));
+ // mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
+ //
+ // mapDriver.runTest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
new file mode 100644
index 0000000..847071d
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MockupMapContext {
+
+ public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
+
+ hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+
+ return new WrappedMapper().getMapContext(new MapContext() {
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException, InterruptedException {
+ System.out.println("Write -- k:" + key + ", v:" + value);
+ if (outKV != null) {
+ outKV[0] = key;
+ outKV[1] = value;
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public TaskAttemptID getTaskAttemptID() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void setStatus(String msg) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getStatus() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public float getProgress() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return hconf;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public JobID getJobID() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getNumReduceTasks() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getOutputKeyClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getOutputValueClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getMapOutputKeyClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<?> getMapOutputValueClass() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getJobName() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getSortComparator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getJar() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getGroupingComparator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getJobSetupCleanupNeeded() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getTaskCleanupNeeded() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getProfileEnabled() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getProfileParams() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public IntegerRanges getProfileTaskRange(boolean isMap) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getUser() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean getSymlink() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getArchiveClassPaths() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public URI[] getCacheArchives() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public URI[] getCacheFiles() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getLocalCacheArchives() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getLocalCacheFiles() throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Path[] getFileClassPaths() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String[] getArchiveTimestamps() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String[] getFileTimestamps() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getMaxMapAttempts() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getMaxReduceAttempts() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void progress() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InputSplit getInputSplit() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ throw new NotImplementedException();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
new file mode 100644
index 0000000..6ed9010
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class NDCuboidJobTest extends LocalFileMetadataTestCase {
+
+ private Configuration conf;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.job.tracker", "local");
+
+ // for local runner out-of-memory issue
+ conf.set("mapreduce.task.io.sort.mb", "10");
+
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testJob6D() throws Exception {
+ String input = "src/test/resources/data/base_cuboid/";
+ String output = "target/test-output/6d_cuboid";
+ String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+ String segmentName = "20130331080000_20131212080000";
+ String jobname = "6d_cuboid";
+ String level = "1";
+
+ FileUtil.fullyDelete(new File(output));
+
+ String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
+ assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
+ }
+
+ @Test
+ public void testJob5D() throws Exception {
+ final String input = "src/test/resources/data/6d_cuboid/";
+ final String output = "target/test-output/5d_cuboid";
+ final String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+ String segmentName = "20130331080000_20131212080000";
+ String jobname = "5d_cuboid";
+ String level = "2";
+
+ FileUtil.fullyDelete(new File(output));
+
+ String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
+ assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
new file mode 100644
index 0000000..9f50cc6
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CuboidReducer;
+import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
+ MapReduceDriver<Text, Text, Text, Text, Text, Text> mapReduceDriver;
+ String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+ @Before
+ public void setUp() throws Exception {
+ createTestMetadata();
+
+ // hack for distributed cache
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+
+ NDCuboidMapper mapper = new NDCuboidMapper();
+ CuboidReducer reducer = new CuboidReducer();
+ mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ }
+
+ @Test
+ public void testMapReduceWithSlr() throws IOException {
+
+ String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+ String segmentName = "20130331080000_20131212080000";
+ mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+ byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1 };
+ Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value));
+
+ mapReduceDriver.addInput(input1);
+
+ List<Pair<Text, Text>> result = mapReduceDriver.run();
+
+ assertEquals(4, result.size());
+
+ byte[] resultKey = { 0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+ byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1 };
+ Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
+
+ //As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue
+ Collection<Text> keys = Collections2.transform(result, new Function<Pair<Text, Text>, Text>() {
+ @Nullable
+ @Override
+ public Text apply(Pair<Text, Text> input) {
+ return input.getFirst();
+ }
+ });
+ assertTrue(keys.contains(output1.getFirst()));
+ assertTrue(!result.contains(output1));
+
+ long[] keySet = new long[result.size()];
+
+ System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 }));
+ for (int i = 0; i < result.size(); i++) {
+ byte[] bytes = new byte[result.get(i).getFirst().getLength()];
+ System.arraycopy(result.get(i).getFirst().getBytes(), 0, bytes, 0, result.get(i).getFirst().getLength());
+ System.out.println(Bytes.toLong(bytes));
+ keySet[i] = Bytes.toLong(bytes);
+ }
+
+ // refer to CuboidSchedulerTest.testGetSpanningCuboid()
+ assertArrayEquals(new long[] { 383, 447, 503, 504 }, keySet);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index cbb0772..3042778 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -104,6 +104,13 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-job</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-storage-hbase</artifactId>
<type>test-jar</type>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
deleted file mode 100644
index d24c99c..0000000
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class BuildEngineFactory {
-
- private static IBatchCubingEngine defaultBatchEngine;
-
- public static IBatchCubingEngine defaultBatchEngine() {
- if (defaultBatchEngine == null) {
- KylinConfig conf = KylinConfig.getInstanceFromEnv();
- if (conf.isCubingInMem()) {
- defaultBatchEngine = new MRBatchCubingEngine2();
- } else {
- defaultBatchEngine = new MRBatchCubingEngine();
- }
- }
- return defaultBatchEngine;
- }
-
- /** Build a new cube segment, typically its time range appends to the end of current cube. */
- public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
- }
-
- /** Merge multiple small segments into a big one. */
- public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
deleted file mode 100644
index 904f557..0000000
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IBatchCubingEngine {
-
- /** Build a new cube segment, typically its time range appends to the end of current cube. */
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
-
- /** Merge multiple small segments into a big one. */
- public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
-
- public Class<?> getSourceInterface();
-
- public Class<?> getStorageInterface();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
deleted file mode 100644
index 0359ce9..0000000
--- a/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-
-public interface IStreamingCubingEngine {
-
- public Runnable createStreamingCubingBuilder(CubeSegment seg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
deleted file mode 100644
index a39ac74..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-
-public class BatchCubingJobBuilder extends JobBuilderSupport {
-
- private final IMRBatchCubingInputSide inputSide;
- private final IMRBatchCubingOutputSide outputSide;
-
- public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
- this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
- final String jobId = result.getId();
- final String cuboidRootPath = getCuboidRootPath(jobId);
-
- // Phase 1: Create Flat Table
- inputSide.addStepPhase1_CreateFlatTable(result);
-
- // Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(jobId));
- result.addTask(createBuildDictionaryStep(jobId));
-
- // Phase 3: Build Cube
- final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
- final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
- // base cuboid step
- result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
- // n dim cuboid steps
- for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnsCount - i;
- result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
- }
- outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
-
- // Phase 4: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_Cleanup(result);
- outputSide.addStepPhase4_Cleanup(result);
-
- return result;
- }
-
- private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", ""); // marks flat table input
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
- return baseCuboidStep;
- }
-
- private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
- // ND cuboid job
- MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
-
- ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
- appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-
- ndCuboidStep.setMapReduceParams(cmd.toString());
- ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
- return ndCuboidStep;
- }
-
- private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
- String[] paths = new String[groupRowkeyColumnsCount + 1];
- for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnCount - i;
- if (dimNum == totalRowkeyColumnCount) {
- paths[i] = cuboidRootPath + "base_cuboid";
- } else {
- paths[i] = cuboidRootPath + dimNum + "d_cuboid";
- }
- }
- return paths;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
deleted file mode 100644
index b6f264e..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
-import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
- private final IMRBatchCubingInputSide inputSide;
- private final IMRBatchCubingOutputSide2 outputSide;
-
- public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
- this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
- final String jobId = result.getId();
-
- // Phase 1: Create Flat Table
- inputSide.addStepPhase1_CreateFlatTable(result);
-
- // Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStepWithStats(jobId));
- result.addTask(createBuildDictionaryStep(jobId));
- result.addTask(createSaveStatisticsStep(jobId));
- outputSide.addStepPhase2_BuildDictionary(result);
-
- // Phase 3: Build Cube
- result.addTask(createInMemCubingStep(jobId));
- outputSide.addStepPhase3_BuildCube(result);
-
- // Phase 4: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_Cleanup(result);
- outputSide.addStepPhase4_Cleanup(result);
-
- return result;
- }
-
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- return result;
- }
-
- private MapReduceExecutable createInMemCubingStep(String jobId) {
- // base cuboid job
- MapReduceExecutable cubeStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "jobflowid", jobId);
-
- cubeStep.setMapReduceParams(cmd.toString());
- cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
- cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
- return cubeStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
deleted file mode 100644
index 6264ebd..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder extends JobBuilderSupport {
-
- private final IMRBatchMergeOutputSide outputSide;
-
- public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
- super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
- final String jobId = result.getId();
- final String cuboidRootPath = getCuboidRootPath(jobId);
-
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingCuboidPaths = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- }
-
- // Phase 1: Merge Dictionary
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
- // Phase 2: Merge Cube Files
- String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
- result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
- outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
-
- // Phase 3: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
- outputSide.addStepPhase3_Cleanup(result);
-
- return result;
- }
-
- private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", outputPath);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
- return mergeCuboidDataStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
deleted file mode 100644
index e0fc438..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-
-import java.util.List;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
- private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
-
- public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
- super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
- final String jobId = result.getId();
-
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingHTables.add(merging.getStorageLocationIdentifier());
- }
-
- // Phase 1: Merge Dictionary
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
- outputSide.addStepPhase1_MergeDictionary(result);
-
- // Phase 2: Merge Cube
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
- outputSide.addStepPhase2_BuildCube(result);
-
- // Phase 3: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
- outputSide.addStepPhase3_Cleanup(result);
-
- return result;
- }
-
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
- return result;
- }
-
- private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
- appendExecCmdParameters(cmd, "jobflowid", jobId);
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
- return mergeCuboidDataStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
deleted file mode 100644
index 37a8841..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.kylin.common.util.Bytes;
-
-public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
-
- private byte[] data;
- private int offset;
- private int length;
-
- public ByteArrayWritable() {
- this(null, 0, 0);
- }
-
- public ByteArrayWritable(int capacity) {
- this(new byte[capacity], 0, capacity);
- }
-
- public ByteArrayWritable(byte[] data) {
- this(data, 0, data == null ? 0 : data.length);
- }
-
- public ByteArrayWritable(byte[] data, int offset, int length) {
- this.data = data;
- this.offset = offset;
- this.length = length;
- }
-
- public byte[] array() {
- return data;
- }
-
- public int offset() {
- return offset;
- }
-
- public int length() {
- return length;
- }
-
- public void set(byte[] array) {
- set(array, 0, array.length);
- }
-
- public void set(byte[] array, int offset, int length) {
- this.data = array;
- this.offset = offset;
- this.length = length;
- }
-
- public ByteBuffer asBuffer() {
- if (data == null)
- return null;
- else if (offset == 0 && length == data.length)
- return ByteBuffer.wrap(data);
- else
- return ByteBuffer.wrap(data, offset, length).slice();
- }
-
- @Override
- public int hashCode() {
- if (data == null)
- return 0;
- else
- return Bytes.hashCode(data, offset, length);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.length);
- out.write(this.data, this.offset, this.length);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.length = in.readInt();
- this.data = new byte[this.length];
- in.readFully(this.data, 0, this.length);
- this.offset = 0;
- }
-
- // Below methods copied from BytesWritable
- /**
- * Define the sort order of the BytesWritable.
- * @param that The other bytes writable
- * @return Positive if left is bigger than right, 0 if they are equal, and
- * negative if left is smaller than right.
- */
- public int compareTo(ByteArrayWritable that) {
- return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
- }
-
- /**
- * Compares the bytes in this object to the specified byte array
- * @param that
- * @return Positive if left is bigger than right, 0 if they are equal, and
- * negative if left is smaller than right.
- */
- public int compareTo(final byte[] that) {
- return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
- }
-
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object right_obj) {
- if (right_obj instanceof byte[]) {
- return compareTo((byte[]) right_obj) == 0;
- }
- if (right_obj instanceof ByteArrayWritable) {
- return compareTo((ByteArrayWritable) right_obj) == 0;
- }
- return false;
- }
-
- /**
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(3 * this.length);
- final int endIdx = this.offset + this.length;
- for (int idx = this.offset; idx < endIdx; idx++) {
- sb.append(' ');
- String num = Integer.toHexString(0xff & this.data[idx]);
- // if it is only one digit, add a leading 0.
- if (num.length() < 2) {
- sb.append('0');
- }
- sb.append(num);
- }
- return sb.length() > 0 ? sb.substring(1) : "";
- }
-
- /** A Comparator optimized for ByteArrayWritable.
- */
- public static class Comparator extends WritableComparator {
- private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
-
- /** constructor */
- public Comparator() {
- super(ByteArrayWritable.class);
- }
-
- /**
- * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
- */
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return comparator.compare(b1, s1, l1, b2, s2, l2);
- }
- }
-
- static { // register this comparator
- WritableComparator.define(ByteArrayWritable.class, new Comparator());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
deleted file mode 100644
index 77cbab7..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-/**
- */
-public class CubingJob extends DefaultChainedExecutable {
-
- // KEYS of Output.extraInfo map, info passed across job steps
- public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
- public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
- public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
- private static final String CUBE_INSTANCE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
-
- public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
- return initCubingJob(seg, "BUILD", submitter, config);
- }
-
- public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
- return initCubingJob(seg, "MERGE", submitter, config);
- }
-
- private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
- CubingJob result = new CubingJob();
- SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
- format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
- result.setSubmitter(submitter);
- result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
- return result;
- }
-
- public CubingJob() {
- super();
- }
-
- void setCubeName(String name) {
- setParam(CUBE_INSTANCE_NAME, name);
- }
-
- public String getCubeName() {
- return getParam(CUBE_INSTANCE_NAME);
- }
-
- void setSegmentId(String segmentId) {
- setParam(SEGMENT_ID, segmentId);
- }
-
- public String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- @Override
- protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
- CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
- final Output output = jobService.getOutput(getId());
- String logMsg;
- state = output.getState();
- if (state != ExecutableState.ERROR &&
- !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
- logger.info("state:" + state + " no need to notify users");
- return null;
- }
- switch (state) {
- case ERROR:
- logMsg = output.getVerboseMsg();
- break;
- case DISCARDED:
- logMsg = "job has been discarded";
- break;
- case SUCCEED:
- logMsg = "job has succeeded";
- break;
- default:
- return null;
- }
- if (logMsg == null) {
- logMsg = "no error message";
- }
- String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
- content = content.replaceAll("\\$\\{job_name\\}", getName());
- content = content.replaceAll("\\$\\{result\\}", state.toString());
- content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
- content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
- content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
- content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
- content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
- content = content.replaceAll("\\$\\{submitter\\}", getSubmitter());
- content = content.replaceAll("\\$\\{error_log\\}", logMsg);
-
- try {
- InetAddress inetAddress = InetAddress.getLocalHost();
- content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
- } catch (UnknownHostException e) {
- logger.warn(e.getLocalizedMessage(), e);
- }
-
- String title = "["+ state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
- return Pair.of(title, content);
- }
-
- @Override
- protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
- long time = 0L;
- for (AbstractExecutable task: getTasks()) {
- final ExecutableState status = task.getStatus();
- if (status != ExecutableState.SUCCEED) {
- break;
- }
- if (task instanceof MapReduceExecutable) {
- time += ((MapReduceExecutable) task).getMapReduceWaitTime();
- }
- }
- setMapReduceWaitTime(time);
- super.onExecuteFinished(result, executableContext);
- }
-
- public long getMapReduceWaitTime() {
- return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
- }
-
- public void setMapReduceWaitTime(long t) {
- addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
- }
-
- public long findSourceRecordCount() {
- return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
- }
-
- public long findSourceSizeBytes() {
- return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
- }
-
- public long findCubeSizeBytes() {
- return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
- }
-
- private String findExtraInfo(String key, String dft) {
- for (AbstractExecutable child : getTasks()) {
- Output output = executableManager.getOutput(child.getId());
- String value = output.getExtra().get(key);
- if (value != null)
- return value;
- }
- return dft;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
deleted file mode 100644
index 0c39398..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
- */
-public interface IMRInput {
-
- /** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
-
- /** Return an InputFormat that reads from specified table. */
- public IMRTableInputFormat getTableInputFormat(TableDesc table);
-
- /**
- * Utility that configures mapper to read from a table.
- */
- public interface IMRTableInputFormat {
-
- /** Configure the InputFormat of given job. */
- public void configureJob(Job job);
-
- /** Parse a mapper input object into column values. */
- public String[] parseMapperInput(Object mapperInput);
- }
-
- /**
- * Participate the batch cubing flow as the input side. Responsible for creating
- * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
- *
- * - Phase 1: Create Flat Table
- * - Phase 2: Build Dictionary (with FlatTableInputFormat)
- * - Phase 3: Build Cube (with FlatTableInputFormat)
- * - Phase 4: Update Metadata & Cleanup
- */
- public interface IMRBatchCubingInputSide {
-
- /** Return an InputFormat that reads from the intermediate flat table */
- public IMRTableInputFormat getFlatTableInputFormat();
-
- /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
- /** Add step that does necessary clean up, like delete the intermediate flat table */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
- }
-}