You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:14 UTC
[06/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
deleted file mode 100644
index 8e85a27..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.invertedindex.RandomKeyDistributionReducer;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- *
- */
-public class RandomKeyDistributionReducerTest {
- ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
-
- @Before
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void setUp() {
- RandomKeyDistributionReducer reducer = new RandomKeyDistributionReducer();
- reduceDriver = ReduceDriver.newReduceDriver(reducer);
- }
-
- @Test
- public void test() throws IOException {
- List<Text> data = new ArrayList<Text>();
- for (int i = 0; i < 1001; i++) {
- data.add(new Text(String.valueOf(i)));
- }
- for (Text t : data) {
- reduceDriver.addInput(t, new ArrayList<LongWritable>());
- }
-
- reduceDriver.getConfiguration().set(BatchConstants.REGION_NUMBER, "2");
- List<Pair<Text, LongWritable>> result = reduceDriver.run();
-
- assertEquals(2, result.size());
-
- for (Pair<Text, LongWritable> p : result) {
- System.out.println(p.getFirst());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
deleted file mode 100644
index b3bcc30..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionJobTest extends LocalFileMetadataTestCase {
-
- private Configuration conf;
-
- @Before
- public void setup() throws Exception {
- conf = new Configuration();
- conf.set("fs.default.name", "file:///");
- conf.set("mapred.job.tracker", "local");
-
- // for local runner out-of-memory issue
- conf.set("mapreduce.task.io.sort.mb", "10");
- createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void testJob() throws Exception {
- String input = "src/test/resources/data/base_cuboid/,src/test/resources/data/6d_cuboid/";
- String output = "target/test-output/key_distribution_range/";
- String jobname = "calculate_splits";
- String cubename = "test_kylin_cube_with_slr_ready";
-
- FileUtil.fullyDelete(new File(output));
-
- String[] args = { "-input", input, "-output", output, "-jobname", jobname, "-cubename", cubename };
- assertEquals("Job failed", 0, ToolRunner.run(conf, new RangeKeyDistributionJob(), args));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
deleted file mode 100644
index 550fd6b..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionMapperTest {
-
- @SuppressWarnings("rawtypes")
- MapDriver mapDriver;
- String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
- @Before
- public void setUp() {
- RangeKeyDistributionMapper mapper = new RangeKeyDistributionMapper();
- mapDriver = MapDriver.newMapDriver(mapper);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapperWithoutHeader() throws IOException {
-
- Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey3 = new Text(new byte[] { 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey4 = new Text(new byte[] { 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey5 = new Text(new byte[] { 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey6 = new Text(new byte[] { 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey7 = new Text(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-
- mapDriver.addInput(inputKey1, new Text("abc"));
- mapDriver.addInput(inputKey2, new Text("abc"));
- mapDriver.addInput(inputKey3, new Text("abc"));
- mapDriver.addInput(inputKey4, new Text("abc"));
- mapDriver.addInput(inputKey5, new Text("abc"));
- mapDriver.addInput(inputKey6, new Text("abc"));
- mapDriver.addInput(inputKey7, new Text("abc"));
-
- List<Pair<Text, LongWritable>> result = mapDriver.run();
-
- assertEquals(1, result.size());
-
- byte[] key1 = result.get(0).getFirst().getBytes();
- LongWritable value1 = result.get(0).getSecond();
- assertArrayEquals(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
- assertEquals(147, value1.get());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapperWithHeader() throws IOException {
-
- Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey3 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey4 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey5 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey6 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
- Text inputKey7 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-
- mapDriver.addInput(inputKey1, new Text("abc"));
- mapDriver.addInput(inputKey2, new Text("abc"));
- mapDriver.addInput(inputKey3, new Text("abc"));
- mapDriver.addInput(inputKey4, new Text("abc"));
- mapDriver.addInput(inputKey5, new Text("abc"));
- mapDriver.addInput(inputKey6, new Text("abc"));
- mapDriver.addInput(inputKey7, new Text("abc"));
-
- List<Pair<Text, LongWritable>> result = mapDriver.run();
-
- assertEquals(1, result.size());
-
- byte[] key1 = result.get(0).getFirst().getBytes();
- LongWritable value1 = result.get(0).getSecond();
- assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
- assertEquals(273, value1.get());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
deleted file mode 100644
index 38fb132..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionReducerTest {
-
- ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
- String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
- @Before
- public void setUp() {
- RangeKeyDistributionReducer reducer = new RangeKeyDistributionReducer();
- reduceDriver = ReduceDriver.newReduceDriver(reducer);
- }
-
- @Test
- public void testReducer() throws IOException {
- // TODO
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
deleted file mode 100644
index 386d858..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-/**
- */
-public class CubeSamplingTest {
-
- private static final int ROW_LENGTH = 10;
-
- private final List<String> row = new ArrayList<String>(ROW_LENGTH);
- private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
-
- private Integer[][] allCuboidsBitSet;
- private HashFunction hf = null;
- private long baseCuboidId;
- private HyperLogLogPlusCounter[] allCuboidsHLL = null;
- private final byte[] seperator = Bytes.toBytes(",");
-
- @Before
- public void setup() {
-
- baseCuboidId = (1l << ROW_LENGTH) - 1;
- List<Long> allCuboids = Lists.newArrayList();
- List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
- for (long i = 1; i < baseCuboidId; i++) {
- allCuboids.add(i);
- addCuboidBitSet(i, allCuboidsBitSetList);
- }
-
- allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
- System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
- allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
- for (int i = 0; i < allCuboids.size(); i++) {
- allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
- }
-
- // hf = Hashing.goodFastHash(32);
-// hf = Hashing.md5();
- hf = Hashing.murmur3_32();
-
- for (int i = 0; i < ROW_LENGTH; i++) {
- row_index[i] = new ByteArray();
- }
- }
-
- private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- Integer[] indice = new Integer[bitSet.cardinality()];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < ROW_LENGTH; i++) {
- if ((mask & cuboidId) > 0) {
- indice[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
-
- allCuboidsBitSet.add(indice);
-
- }
-
- @Test
- public void test() {
-
- long start = System.currentTimeMillis();
- List<String> row;
- for (int i = 0; i < 10000; i++) {
- row = getRandomRow();
- putRowKeyToHLL(row);
- }
-
- long duration = System.currentTimeMillis() - start;
- System.out.println("The test takes " + duration / 1000 + "seconds.");
- }
-
- private void putRowKeyToHLL(List<String> row) {
- int x = 0;
- for (String field : row) {
- Hasher hc = hf.newHasher();
- row_index[x++].set(hc.putString(field).hash().asBytes());
- }
-
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
- hc.putBytes(seperator);
- }
- allCuboidsHLL[i].add(hc.hash().asBytes());
- }
- }
-
- private List<String> getRandomRow() {
- row.clear();
- for (int i = 0; i < ROW_LENGTH; i++) {
- row.add(RandomStringUtils.random(10));
- }
- return row;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
deleted file mode 100644
index b85afcc..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Maps;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- */
-public class FactDistinctColumnsReducerTest {
-
-
- @Test
- public void testWriteCuboidStatistics() throws IOException {
-
- final Configuration conf = HadoopUtil.getCurrentConfiguration();
- final Path outputPath = new Path("file:///tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString());
- if (!FileSystem.getLocal(conf).exists(outputPath)) {
-// FileSystem.getLocal(conf).create(outputPath);
- }
-
- System.out.println(outputPath);
- Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
deleted file mode 100644
index 5f43c2a..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import static org.junit.Assert.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class CreateHTableTest extends LocalFileMetadataTestCase {
-
- private Configuration conf;
-
- @Before
- public void setup() throws Exception {
- conf = new Configuration();
- conf.set("fs.default.name", "file:///");
- conf.set("mapred.job.tracker", "local");
- this.createTestMetadata();
-
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testGetSplits() throws IllegalArgumentException, Exception {
- CreateHTableJob c = new CreateHTableJob();
-
- String input = "src/test/resources/partition_list/part-r-00000";
-
- byte[][] splits = c.getSplits(conf, new Path(input));
-
- assertEquals(497, splits.length);
- assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]);
- assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 3, -1, -1, -54, -61, 109, -44, 1 }, splits[496]);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
deleted file mode 100644
index 3232a80..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.kylin.common.util.Bytes;
-
-/**
- */
-public class TestHbaseClient {
-
- private static boolean reverse = false;
-
- public static void foo(int n, int k) {
- int t = k;
- if (n - k < k) {
- t = n - k;
- reverse = true;
- }
- boolean[] flags = new boolean[n];
- inner(flags, 0, t);
- }
-
- private static void print(boolean[] flags) {
- for (int i = 0; i < flags.length; i++) {
- if (!reverse) {
- if (flags[i])
- System.out.print("0");
- else
- System.out.print("1");
- } else {
- if (flags[i])
- System.out.print("1");
- else
- System.out.print("0");
-
- }
- }
- System.out.println();
-
- }
-
- private static void inner(boolean[] flags, int start, int remaining) {
- if (remaining <= 0) {
- print(flags);
- return;
- }
-
- if (flags.length - start < remaining) {
- return;
- }
-
- // write at flags[start]
- flags[start] = true;
- inner(flags, start + 1, remaining - 1);
-
- // not write at flags[start]
- flags[start] = false;
- inner(flags, start + 1, remaining);
- }
-
- public static void main(String[] args) throws IOException {
- foo(6, 5);
- foo(5, 2);
- foo(3, 0);
-
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "hbase_host");
- conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-
- HTable table = new HTable(conf, "test1");
- Put put = new Put(Bytes.toBytes("row1"));
-
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
-
- table.put(put);
- table.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
deleted file mode 100644
index 69519e0..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- */
-public class ITHdfsOpsTest extends HBaseMetadataTestCase {
-
- FileSystem fileSystem;
-
- @Before
- public void setup() throws Exception {
-
- this.createTestMetadata();
-
- Configuration hconf = new Configuration();
-
- fileSystem = FileSystem.get(hconf);
- }
-
- @Test
- public void TestPath() throws IOException {
- String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
- fileSystem.mkdirs(coprocessorDir);
-
- Path newFile = new Path(coprocessorDir, "test_file");
- newFile = newFile.makeQualified(fileSystem.getUri(), null);
- FSDataOutputStream stream = fileSystem.create(newFile);
- stream.write(new byte[] { 0, 1, 2 });
- stream.close();
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
deleted file mode 100644
index 21f6a71..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hive;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-
-/**
- * @author George Song (ysong1)
- *
- */
-@Ignore("This test case doesn't have much value, ignore it.")
-public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
-
- CubeInstance cube = null;
- CubeJoinedFlatTableDesc intermediateTableDesc = null;
- String fakeJobUUID = "abc-def";
- CubeSegment cubeSegment = null;
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
- cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
- cubeSegment = cube.getSegments().get(0);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testGenCreateTableDDL() {
- String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
- System.out.println(ddl);
-
- System.out.println("The length for the ddl is " + ddl.length());
- }
-
- @Test
- public void testGenDropTableDDL() {
- String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
- System.out.println(ddl);
- assertEquals(107, ddl.length());
- }
-
- @Test
- public void testGenerateInsertSql() throws IOException {
- String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
- System.out.println(sqls);
-
- int length = sqls.length();
- assertEquals(1155, length);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 20208f2..1a503e1 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
deleted file mode 100644
index 2fbcd94..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.UnitTestSupport;
-import org.junit.Test;
-
-public class ConcurrentDiskStoreTest {
-
- final GTInfo info = UnitTestSupport.advancedInfo();
- final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
-
- @Test
- public void testSingleThreadRead() throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead(1);
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- @Test
- public void testMultiThreadRead() throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead(20);
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
- ConcurrentDiskStore store = new ConcurrentDiskStore(info);
- GridTable table = new GridTable(info, store);
- verifyWriteAndRead(table, readThreads);
- }
-
- private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
- GTBuilder builder = table.rebuild();
- for (GTRecord r : data) {
- builder.write(r);
- }
- builder.close();
-
- int nThreads = readThreads;
- Thread[] t = new Thread[nThreads];
- for (int i = 0; i < nThreads; i++) {
- t[i] = new Thread() {
- public void run() {
- try {
- IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
- int i = 0;
- for (GTRecord r : scanner) {
- assertEquals(data.get(i++), r);
- }
- scanner.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- };
- t[i].start();
- }
- for (int i = 0; i < nThreads; i++) {
- t[i].join();
- }
-
- ((ConcurrentDiskStore) table.getStore()).close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
deleted file mode 100644
index d5563b7..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
-
- // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
- private static final int INPUT_ROWS = 200000;
- private static final int THREADS = 4;
-
- private static CubeInstance cube;
- private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-
- @BeforeClass
- public static void before() throws IOException {
- staticCreateTestMetadata();
-
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
- cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
- }
-
- @AfterClass
- public static void after() throws Exception {
- staticCleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
-
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- long randSeed = System.currentTimeMillis();
-
- DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- doggedBuilder.setConcurrentThreads(THREADS);
-
- {
- Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
- future.get();
- }
- }
-
- class NoopWriter implements ICuboidWriter {
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
deleted file mode 100644
index a87f950..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
-
- private static final int INPUT_ROWS = 10000;
- private static final int SPLIT_ROWS = 5000;
- private static final int THREADS = 4;
-
- private static CubeInstance cube;
- private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-
- @BeforeClass
- public static void before() throws IOException {
- staticCreateTestMetadata();
-
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
- cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
- }
-
- @AfterClass
- public static void after() throws Exception {
- staticCleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
-
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- long randSeed = System.currentTimeMillis();
-
- DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- doggedBuilder.setConcurrentThreads(THREADS);
- doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
- FileRecordWriter doggedResult = new FileRecordWriter();
-
- {
- Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
- future.get();
- doggedResult.close();
- }
-
- InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- inmemBuilder.setConcurrentThreads(THREADS);
- FileRecordWriter inmemResult = new FileRecordWriter();
-
- {
- Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
- future.get();
- inmemResult.close();
- }
-
- fileCompare(doggedResult.file, inmemResult.file);
- doggedResult.file.delete();
- inmemResult.file.delete();
- }
-
- private void fileCompare(File file, File file2) throws IOException {
- BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
- BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
-
- String line1, line2;
- do {
- line1 = r1.readLine();
- line2 = r2.readLine();
-
- assertEquals(line1, line2);
-
- } while (line1 != null || line2 != null);
-
- r1.close();
- r2.close();
- }
-
- class FileRecordWriter implements ICuboidWriter {
-
- File file;
- PrintWriter writer;
-
- FileRecordWriter() throws IOException {
- file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
- writer = new PrintWriter(file, "UTF-8");
- }
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- writer.print(cuboidId);
- writer.print(", ");
- writer.print(record.toString());
- writer.println();
- }
-
- public void close() {
- writer.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
deleted file mode 100644
index 3caa1b0..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.engine.mr.DFSFileTableReader;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
- private static final int INPUT_ROWS = 70000;
- private static final int THREADS = 4;
-
- private static CubeInstance cube;
- private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-
- @BeforeClass
- public static void before() throws IOException {
- staticCreateTestMetadata();
-
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
- cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = getDictionaryMap(cube, flatTable);
- }
-
- @AfterClass
- public static void after() throws Exception {
- staticCleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
-
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- cubeBuilder.setConcurrentThreads(THREADS);
-
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- try {
- // round 1
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, INPUT_ROWS);
- future.get();
- }
-
- // round 2, zero input
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, 0);
- future.get();
- }
-
- // round 3
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, INPUT_ROWS);
- future.get();
- }
-
- } catch (Exception e) {
- logger.error("stream build failed", e);
- throw new IOException("Failed to build cube ", e);
- }
- }
-
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
- feedData(cube, flatTable, queue, count, 0);
- }
-
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
- int nColumns = flatTableDesc.getColumnList().size();
-
- @SuppressWarnings("unchecked")
- Set<String>[] distinctSets = new Set[nColumns];
- for (int i = 0; i < nColumns; i++)
- distinctSets[i] = new TreeSet<String>();
-
- // get distinct values on each column
- DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
- while (count > 0 && reader.next()) {
- String[] row = reader.getRow();
- for (int i = 0; i < nColumns; i++)
- distinctSets[i].add(row[i]);
- }
- reader.close();
-
- List<String[]> distincts = new ArrayList<String[]>();
- for (int i = 0; i < nColumns; i++) {
- distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
- }
-
- Random rand = new Random();
- if (randSeed != 0)
- rand.setSeed(randSeed);
-
- // output with random data
- for (; count > 0; count--) {
- ArrayList<String> row = new ArrayList<String>(nColumns);
- for (int i = 0; i < nColumns; i++) {
- String[] candidates = distincts.get(i);
- row.add(candidates[rand.nextInt(candidates.length)]);
- }
- queue.put(row);
- }
- queue.put(new ArrayList<String>(0));
- }
-
- static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
- Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
- CubeDesc desc = cube.getDescriptor();
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- int nColumns = flatTableDesc.getColumnList().size();
-
- List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
- for (int c = 0; c < columns.size(); c++) {
- TblColRef col = columns.get(c);
- if (desc.getRowkey().isUseDictionary(col)) {
- logger.info("Building dictionary for " + col);
- List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
- result.put(col, dict);
- }
- }
- return result;
- }
-
- private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
- List<byte[]> result = Lists.newArrayList();
- DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
- while (reader.next()) {
- String[] row = reader.getRow();
- if (row[c] != null) {
- result.add(Bytes.toBytes(row[c]));
- }
- }
- reader.close();
- return result;
- }
-
- class ConsoleGTRecordWriter implements ICuboidWriter {
-
- boolean verbose = false;
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- if (verbose)
- System.out.println(record.toString());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
deleted file mode 100644
index fd848f2..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.UnitTestSupport;
-import org.junit.Test;
-
-public class MemDiskStoreTest {
-
- final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
- final GTInfo info = UnitTestSupport.advancedInfo();
- final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
-
- @Test
- public void testSingleThreadWriteRead() throws IOException {
- long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead();
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- @Test
- public void testMultiThreadWriteRead() throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
-
- int nThreads = 5;
- Thread[] t = new Thread[nThreads];
- for (int i = 0; i < nThreads; i++) {
- t[i] = new Thread() {
- public void run() {
- try {
- verifyOneTableWriteAndRead();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- };
- t[i].start();
- }
- for (int i = 0; i < nThreads; i++) {
- t[i].join();
- }
-
- long end = System.currentTimeMillis();
- System.out.println("Cost " + (end - start) + " millis");
- }
-
- private void verifyOneTableWriteAndRead() throws IOException {
- MemDiskStore store = new MemDiskStore(info, budgetCtrl);
- GridTable table = new GridTable(info, store);
- verifyWriteAndRead(table);
- }
-
- private void verifyWriteAndRead(GridTable table) throws IOException {
- GTInfo info = table.getInfo();
-
- GTBuilder builder = table.rebuild();
- for (GTRecord r : data) {
- builder.write(r);
- }
- builder.close();
-
- IGTScanner scanner = table.scan(new GTScanRequest(info));
- int i = 0;
- for (GTRecord r : scanner) {
- assertEquals(data.get(i++), r);
- }
- scanner.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
deleted file mode 100644
index 6f86c95..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class MemoryBudgetControllerTest {
-
- @Test
- public void test() {
- final int n = MemoryBudgetController.getSystemAvailMB() / 2;
- final MemoryBudgetController mbc = new MemoryBudgetController(n);
-
- ArrayList<Consumer> mbList = new ArrayList<Consumer>();
- for (int i = 0; i < n; i++) {
- mbList.add(new Consumer(mbc));
- assertEquals(mbList.size(), mbc.getTotalReservedMB());
- }
-
- // a's reservation will free up all the previous
- final Consumer a = new Consumer();
- mbc.reserve(a, n);
- for (int i = 0; i < n; i++) {
- assertEquals(null, mbList.get(i).data);
- }
-
- // cancel a in 2 seconds
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- mbc.reserve(a, 0);
- }
- }.start();
-
- // b will success after some wait
- long bWaitStart = System.currentTimeMillis();
- final Consumer b = new Consumer();
- mbc.reserveInsist(b, n);
- assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
-
- try {
- mbc.reserve(a, 1);
- fail();
- } catch (NotEnoughBudgetException ex) {
- // expected
- }
- }
-
- class Consumer implements MemoryBudgetController.MemoryConsumer {
-
- byte[] data;
-
- Consumer() {
- }
-
- Consumer(MemoryBudgetController mbc) {
- mbc.reserve(this, 1);
- data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
- }
-
- @Override
- public int freeUp(int mb) {
- if (data != null) {
- data = null;
- return 1;
- } else {
- return 0;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java b/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
deleted file mode 100644
index 1d28fc8..0000000
--- a/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.manager;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.execution.ChainedExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- */
-public class ExecutableManagerTest extends LocalFileMetadataTestCase {
-
- private ExecutableManager service;
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- service = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- for (String jobId: service.getAllJobIds()) {
- System.out.println("deleting " + jobId);
- service.deleteJob(jobId);
- }
-
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
- assertNotNull(service);
- BaseTestExecutable executable = new SucceedTestExecutable();
- executable.setParam("test1", "test1");
- executable.setParam("test2", "test2");
- executable.setParam("test3", "test3");
- service.addJob(executable);
- List<AbstractExecutable> result = service.getAllExecutables();
- assertEquals(1, result.size());
- AbstractExecutable another = service.getJob(executable.getId());
- assertJobEqual(executable, another);
-
- service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output");
- assertJobEqual(executable, service.getJob(executable.getId()));
- }
-
- @Test
- public void testDefaultChainedExecutable() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- job.addTask(new SucceedTestExecutable());
- job.addTask(new SucceedTestExecutable());
-
- service.addJob(job);
- assertEquals(2, job.getTasks().size());
- AbstractExecutable anotherJob = service.getJob(job.getId());
- assertEquals(DefaultChainedExecutable.class, anotherJob.getClass());
- assertEquals(2, ((DefaultChainedExecutable) anotherJob).getTasks().size());
- assertJobEqual(job, anotherJob);
- }
-
- @Test
- public void testValidStateTransfer() throws Exception {
- SucceedTestExecutable job = new SucceedTestExecutable();
- String id = job.getId();
- service.addJob(job);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.ERROR, null, null);
- service.updateJobOutput(id, ExecutableState.READY, null, null);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.READY, null, null);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.SUCCEED, null, null);
- }
-
- @Test(expected = IllegalStateTranferException.class)
- public void testInvalidStateTransfer(){
- SucceedTestExecutable job = new SucceedTestExecutable();
- service.addJob(job);
- service.updateJobOutput(job.getId(), ExecutableState.RUNNING, null, null);
- service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null);
- }
-
-
-
- private static void assertJobEqual(Executable one, Executable another) {
- assertEquals(one.getClass(), another.getClass());
- assertEquals(one.getId(), another.getId());
- assertEquals(one.getStatus(), another.getStatus());
- assertEquals(one.isRunnable(), another.isRunnable());
- assertEquals(one.getOutput(), another.getOutput());
- assertTrue((one.getParams() == null && another.getParams() == null) || (one.getParams() != null && another.getParams() != null));
- if (one.getParams() != null) {
- assertEquals(one.getParams().size(), another.getParams().size());
- for (String key : one.getParams().keySet()) {
- assertEquals(one.getParams().get(key), another.getParams().get(key));
- }
- }
- if (one instanceof ChainedExecutable) {
- assertTrue(another instanceof ChainedExecutable);
- List<? extends Executable> onesSubs = ((ChainedExecutable) one).getTasks();
- List<? extends Executable> anotherSubs = ((ChainedExecutable) another).getTasks();
- assertTrue((onesSubs == null && anotherSubs == null) || (onesSubs != null && anotherSubs != null));
- if (onesSubs != null) {
- assertEquals(onesSubs.size(), anotherSubs.size());
- for (int i = 0; i < onesSubs.size(); ++i) {
- assertJobEqual(onesSubs.get(i), anotherSubs.get(i));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 1e55b2c..325cbee 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -42,7 +42,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.BuildEngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
@@ -65,7 +65,7 @@ import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.security.AclPermission;
import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.HBaseRegionSizeCalculator;
+import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 96b615f..bb17bd2 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -31,9 +31,9 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.BuildEngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index 17a918e..8b4a973 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -48,6 +48,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive-hcatalog.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
new file mode 100644
index 0000000..c40924e
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class HiveMRInput implements IMRInput {
+
+ @Override
+ public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ return new BatchCubingInputSide(seg);
+ }
+
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+ return new HiveTableInputFormat(table.getIdentity());
+ }
+
+ public static class HiveTableInputFormat implements IMRTableInputFormat {
+ final String dbName;
+ final String tableName;
+
+ public HiveTableInputFormat(String hiveTable) {
+ String[] parts = HadoopUtil.parseHiveTableName(hiveTable);
+ dbName = parts[0];
+ tableName = parts[1];
+ }
+
+ @Override
+ public void configureJob(Job job) {
+ try {
+ HCatInputFormat.setInput(job, dbName, tableName);
+ job.setInputFormatClass(HCatInputFormat.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String[] parseMapperInput(Object mapperInput) {
+ return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput);
+ }
+
+ }
+
+ public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+
+ final JobEngineConfig conf;
+ final CubeSegment seg;
+ final CubeJoinedFlatTableDesc flatHiveTableDesc;
+
+ public BatchCubingInputSide(CubeSegment seg) {
+ this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.seg = seg;
+ this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+ }
+
+ public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
+
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ String insertDataHqls;
+ try {
+ insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e);
+ }
+
+ ShellExecutable step = new ShellExecutable();
+ StringBuffer buf = new StringBuffer();
+ buf.append("hive -e \"");
+ buf.append(dropTableHql + "\n");
+ buf.append(createTableHql + "\n");
+ buf.append(insertDataHqls + "\n");
+ buf.append("\"");
+
+ step.setCmd(buf.toString());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+ return step;
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ GarbageCollectionStep step = new GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setOldHiveTable(flatHiveTableDesc.getTableName());
+ jobFlow.addTask(step);
+ }
+
+ @Override
+ public IMRTableInputFormat getFlatTableInputFormat() {
+ return new HiveTableInputFormat(flatHiveTableDesc.getTableName());
+ }
+
+ }
+
+ public static class GarbageCollectionStep extends AbstractExecutable {
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ StringBuffer output = new StringBuffer();
+
+ final String hiveTable = this.getOldHiveTable();
+ if (StringUtils.isNotEmpty(hiveTable)) {
+ final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS " + hiveTable + ";\"";
+ ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+ try {
+ context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ }
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public void setOldHiveTable(String hiveTable) {
+ setParam("oldHiveTable", hiveTable);
+ }
+
+ private String getOldHiveTable() {
+ return getParam("oldHiveTable");
+ }
+ }
+
+}