You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:21:46 UTC
[09/23] incubator-kylin git commit: KYLIN-875 half way
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
index 277624a..b259e1a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
deleted file mode 100644
index 7c5e5db..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-
-/**
- * @author xjiang, ysong1
- *
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
-
- parseOptions(options, args);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- // job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RangeKeyDistributionMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RangeKeyDistributionReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubeName);
- RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
- job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
deleted file mode 100644
index e49f090..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
- private LongWritable outputValue = new LongWritable(0);
-
- private long bytesRead = 0;
-
- private Text lastKey;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- lastKey = key;
-
- int bytesLength = key.getLength() + value.getLength();
- bytesRead += bytesLength;
-
- if (bytesRead >= ONE_MEGA_BYTES) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
-
- // reset bytesRead
- bytesRead = 0;
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
deleted file mode 100644
index a580a9e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
- public static final int SMALL_CUT = 5; // 5 GB per region
- public static final int MEDIUM_CUT = 10; // 10 GB per region
- public static final int LARGE_CUT = 50; // 50 GB per region
-
- public static final int MAX_REGION = 1000;
-
- private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
- private LongWritable outputValue = new LongWritable(0);
-
- private int cut;
- private long bytesRead = 0;
- private List<Text> gbPoints = new ArrayList<Text>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- CubeCapacity cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
- switch (cubeCapacity) {
- case SMALL:
- cut = SMALL_CUT;
- break;
- case MEDIUM:
- cut = MEDIUM_CUT;
- break;
- case LARGE:
- cut = LARGE_CUT;
- break;
- }
-
- logger.info("Chosen cut for htable is " + cut);
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- for (LongWritable v : values) {
- bytesRead += v.get();
- }
-
- if (bytesRead >= ONE_GIGA_BYTES) {
- gbPoints.add(new Text(key));
- bytesRead = 0; // reset bytesRead
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- int nRegion = Math.round((float) gbPoints.size() / (float) cut);
- nRegion = Math.max(1, nRegion);
- nRegion = Math.min(MAX_REGION, nRegion);
-
- int gbPerRegion = gbPoints.size() / nRegion;
- gbPerRegion = Math.max(1, gbPerRegion);
-
- System.out.println(nRegion + " regions");
- System.out.println(gbPerRegion + " GB per region");
-
- for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
- Text key = gbPoints.get(i);
- outputValue.set(i);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- context.write(key, outputValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index faf6675..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(rowKeyStatsFilePath);
-
- parseOptions(options, args);
-
- String statsFilePath = getOptionValue(rowKeyStatsFilePath);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RowKeyDistributionCheckerMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RowKeyDistributionCheckerReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index de57562..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- String rowKeyStatsFilePath;
- byte[][] splitKeys;
- Map<Text, Long> resultMap;
- List<Text> keyList;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
- splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
- resultMap = new HashMap<Text, Long>();
- keyList = new ArrayList<Text>();
- for (int i = 0; i < splitKeys.length; i++) {
- Text key = new Text(splitKeys[i]);
- resultMap.put(key, 0L);
- keyList.add(new Text(splitKeys[i]));
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- for (Text t : keyList) {
- if (key.compareTo(t) < 0) {
- Long v = resultMap.get(t);
- long length = key.getLength() + value.getLength();
- v += length;
- resultMap.put(t, v);
- break;
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- LongWritable outputValue = new LongWritable();
- for (Entry<Text, Long> kv : resultMap.entrySet()) {
- outputValue.set(kv.getValue());
- context.write(kv.getKey(), outputValue);
- }
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) {
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeStream(reader);
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
- return retValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index c010a33..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- LongWritable outputKey = new LongWritable(0L);
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
- long length = 0;
- for (LongWritable v : values) {
- length += v.get();
- }
-
- outputKey.set(length);
- context.write(key, outputKey);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index d88b116..1247cd3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -35,13 +35,13 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
deleted file mode 100644
index 1a5e690..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.dict;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- * @author ysong1
- *
- */
-
-public class CreateDictionaryJob extends AbstractHadoopJob {
-
- private int returnCode = 0;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- parseOptions(options, args);
-
- final String cubeName = getOptionValue(OPTION_CUBE_NAME);
- final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
- final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
- @Override
- public ReadableTable getDistinctValuesFor(TblColRef col) {
- return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
- }
- });
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- return returnCode;
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
index 66a1106..87ee70e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
deleted file mode 100644
index 7a4702e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- *
- */
-public class BulkLoadJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(BulkLoadJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_CUBE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- // e.g
- // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
- // end with "/"
- String input = getOptionValue(OPTION_INPUT_PATH);
-
- Configuration conf = HBaseConfiguration.create(getConf());
- FileSystem fs = FileSystem.get(conf);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
- FsPermission permission = new FsPermission((short) 0777);
- for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
- String cfName = cf.getName();
- Path columnFamilyPath = new Path(input + cfName);
-
- // File may have already been auto-loaded (in the case of MapR DB)
- if(fs.exists(columnFamilyPath)) {
- fs.setPermission(columnFamilyPath, permission);
- }
- }
-
- String[] newArgs = new String[2];
- newArgs[0] = input;
- newArgs[1] = tableName;
-
- log.debug("Start to run LoadIncrementalHFiles");
- int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
- log.debug("End to run LoadIncrementalHFiles");
- return ret;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new BulkLoadJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
deleted file mode 100644
index 304fa04..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class CreateHTableJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(CreateHTableJob.class);
-
- CubeInstance cube = null;
- CubeDesc cubeDesc = null;
- String segmentName = null;
- KylinConfig kylinConfig;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_PARTITION_FILE_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_STATISTICS_ENABLED);
- parseOptions(options, args);
-
- Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
- boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
- cube = cubeMgr.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- segmentName = getOptionValue(OPTION_SEGMENT_NAME);
- CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- Configuration conf = HBaseConfiguration.create(getConf());
-
- try {
-
- byte[][] splitKeys;
- if (statistics_enabled) {
- List<Integer> rowkeyColumnSize = Lists.newArrayList();
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- List<TblColRef> columnList = baseCuboid.getColumns();
-
- for (int i = 0; i < columnList.size(); i++) {
- logger.info("Rowkey column " + i + " length " + cubeSegment.getColumnLength(columnList.get(i)));
- rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
- }
-
- splitKeys = getSplitsFromCuboidStatistics(conf, kylinConfig, rowkeyColumnSize, cubeSegment);
- } else {
- splitKeys = getSplits(conf, partitionFilePath);
- }
-
- CubeHTableUtil.createHTable(cubeDesc, tableName, splitKeys);
- return 0;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- logger.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) throws Exception {
- FileSystem fs = path.getFileSystem(conf);
- if (fs.exists(path) == false) {
- System.err.println("Path " + path + " not found, no region split, HTable will be one region");
- return null;
- }
-
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(fs, path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- rowkeyList.add(((Text) key).copyBytes());
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- IOUtils.closeStream(reader);
- }
-
- logger.info((rowkeyList.size() + 1) + " regions");
- logger.info(rowkeyList.size() + " splits");
- for (byte[] split : rowkeyList) {
- logger.info(StringUtils.byteToHexString(split));
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
- return retValue.length == 0 ? null : retValue;
- }
-
-
- @SuppressWarnings("deprecation")
- public static byte[][] getSplitsFromCuboidStatistics(Configuration conf, KylinConfig kylinConfig, List<Integer> rowkeyColumnSize, CubeSegment cubeSegment) throws IOException {
-
- CubeDesc cubeDesc = cubeSegment.getCubeDesc();
- DataModelDesc.RealizationCapacity cubeCapacity = cubeDesc.getModel().getCapacity();
- int cut = kylinConfig.getHBaseRegionCut(cubeCapacity.toString());
-
- logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB");
-
- Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
- long totalSizeInM = 0;
-
- ResourceStore rs = ResourceStore.getStore(kylinConfig);
- String fileKey = cubeSegment.getStatisticsResourcePath();
- InputStream is = rs.getResource(fileKey);
- File tempFile = null;
- FileOutputStream tempFileStream = null;
- try {
- tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
- tempFileStream = new FileOutputStream(tempFile);
- org.apache.commons.io.IOUtils.copy(is, tempFileStream);
- } finally {
- IOUtils.closeStream(is);
- IOUtils.closeStream(tempFileStream);
- }
-
- FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
- LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- int samplingPercentage = 25;
- while (reader.next(key, value)) {
- if (key.get() == 0l) {
- samplingPercentage = Bytes.toInt(value.getBytes());
- } else {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
-
- cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
- }
-
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- IOUtils.closeStream(reader);
- }
-
- List<Long> allCuboids = Lists.newArrayList();
- allCuboids.addAll(cuboidSizeMap.keySet());
- Collections.sort(allCuboids);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- for (long cuboidId : allCuboids) {
- long cuboidSize = estimateCuboidStorageSize(cubeDesc, cuboidId, cuboidSizeMap.get(cuboidId), baseCuboidId, rowkeyColumnSize);
- cuboidSizeMap.put(cuboidId, cuboidSize);
- totalSizeInM += cuboidSize;
- }
-
- int nRegion = Math.round((float) totalSizeInM / ((float) cut * 1024l));
- nRegion = Math.max(kylinConfig.getHBaseRegionCutMin(), nRegion);
- nRegion = Math.min(kylinConfig.getHBaseRegionCutMax(), nRegion);
-
- int mbPerRegion = (int) (totalSizeInM / (nRegion));
- mbPerRegion = Math.max(1, mbPerRegion);
-
- logger.info("Total size " + totalSizeInM + "M (estimated)");
- logger.info(nRegion + " regions (estimated)");
- logger.info(mbPerRegion + " MB per region (estimated)");
-
- List<Long> regionSplit = Lists.newArrayList();
-
-
- long size = 0;
- int regionIndex = 0;
- int cuboidCount = 0;
- for (int i = 0; i < allCuboids.size(); i++) {
- long cuboidId = allCuboids.get(i);
- if (size >= mbPerRegion || (size + cuboidSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
- // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
- regionSplit.add(cuboidId);
- logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
- size = 0;
- cuboidCount = 0;
- regionIndex++;
- }
- size += cuboidSizeMap.get(cuboidId);
- cuboidCount++;
- }
-
-
- byte[][] result = new byte[regionSplit.size()][];
- for (int i = 0; i < regionSplit.size(); i++) {
- result[i] = Bytes.toBytes(regionSplit.get(i));
- }
-
- return result;
- }
-
- /**
- * Estimate the cuboid's size
- *
- * @param cubeDesc
- * @param cuboidId
- * @param rowCount
- * @return the cuboid size in M bytes
- */
- private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
-
- int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-
- long mask = Long.highestOneBit(baseCuboidId);
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & cuboidId) > 0) {
- bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
- }
- mask = mask >> 1;
- }
-
- // add the measure length
- int space = 0;
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- DataType returnType = measureDesc.getFunction().getReturnDataType();
- if (returnType.isHLLC()) {
- // for HLL, it will be compressed when export to bytes
- space += returnType.getSpaceEstimate() * 0.75;
- } else {
- space += returnType.getSpaceEstimate();
- }
- }
- bytesLength += space;
-
- logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes.");
- logger.info("Cuboid " + cuboidId + " total size is " + (bytesLength * rowCount / (1024L * 1024L)) + "M.");
- return bytesLength * rowCount / (1024L * 1024L);
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateHTableJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
deleted file mode 100644
index 88b345b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.kylin.job.hadoop.hbase;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.tools.DeployCoprocessorCLI;
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- */
-public class CubeHTableUtil {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
-
- public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
- tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = new HBaseAdmin(conf);
-
- try {
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
- cf.setMaxVersions(1);
-
- if (LZOSupportnessChecker.getSupportness()) {
- logger.info("hbase will use lzo to compress cube data");
- cf.setCompressionType(Compression.Algorithm.LZO);
- } else {
- logger.info("hbase will not use lzo to compress cube data");
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- cf.setInMemory(false);
- cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
- tableDesc.addFamily(cf);
- }
-
- if (admin.tableExists(tableName)) {
- // admin.disableTable(tableName);
- // admin.deleteTable(tableName);
- throw new RuntimeException("HBase table " + tableName + " exists!");
- }
-
- DeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
- admin.createTable(tableDesc, splitKeys);
- Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
- logger.info("create hbase table " + tableName + " done.");
- } catch (Exception e) {
- logger.error("Failed to create HTable", e);
- throw e;
- } finally {
- admin.close();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
deleted file mode 100644
index f710d34..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hive;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class SqlHiveDataTypeMapping {
-
- public static String getHiveDataType(String javaDataType) {
- String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
- hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
-
- return hiveDataType.toLowerCase();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
index 8575f89..d3f6d68 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index d55900b..08b7a46 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 3690e48..6149d27 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -32,15 +32,15 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.DeployCoprocessorCLI;
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
/**
* @author George Song (ysong1)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
index b1ff6f1..1ed6b89 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
index 1676429..2fb28c0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.engine.mr.common.BatchConstants;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
index 16b8ca1..741dd62 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.hadoop.invertedindex;
import java.io.IOException;
+import java.util.ArrayList;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
@@ -32,11 +33,13 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +105,26 @@ public class InvertedIndexJob extends AbstractHadoopJob {
conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
}
+ protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ // write II / model_desc / II_desc / dict / table
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(ii.getResourcePath());
+ dumpList.add(ii.getDescriptor().getModel().getResourcePath());
+ dumpList.add(ii.getDescriptor().getResourcePath());
+
+ for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
+ TableDesc table = metaMgr.getTableDesc(tableName);
+ dumpList.add(table.getResourcePath());
+ }
+ for (IISegment segment : ii.getSegments()) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
private void setupMapper(String intermediateTable) throws IOException {
String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
index 1d30ee7..0efd585 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -30,13 +30,13 @@ import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
index fa4dccf..584c96b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index d9b5aee..9476428 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -32,8 +34,6 @@ import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
index 0f94d32..649f650 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
@@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.job.constant.BatchConstants;
-
/**
* @author ysong1
*
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
index e3e743e..ba43f71 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.util.RandomSampler;
import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.engine.mr.common.BatchConstants;
/**
* @author ysong1
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
index bc6d379..dbb14fc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.engine.mr.common.BatchConstants;
/**
* @author ysong1
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
deleted file mode 100644
index e959ae2..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableContext;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- */
-public class DefaultContext implements ExecutableContext {
-
- private final ConcurrentMap<String, Executable> runningJobs;
- private final KylinConfig kylinConfig;
-
- public DefaultContext(ConcurrentMap<String, Executable> runningJobs, KylinConfig kylinConfig) {
- this.runningJobs = runningJobs;
- this.kylinConfig = kylinConfig;
- }
- @Override
- public Object getSchedulerContext() {
- return null;
- }
-
- @Override
- public KylinConfig getConfig() {
- return kylinConfig;
- }
-
- void addRunningJob(Executable executable) {
- runningJobs.put(executable.getId(), executable);
- }
-
- void removeRunningJob(Executable executable) {
- runningJobs.remove(executable.getId());
- }
-
- public Map<String, Executable> getRunningJobs() {
- return Collections.unmodifiableMap(runningJobs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
deleted file mode 100644
index 8a83870..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import com.google.common.collect.Maps;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.kylin.job.Scheduler;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- */
-public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
-
-
- private ExecutableManager executableManager;
- private FetcherRunner fetcher;
- private ScheduledExecutorService fetcherPool;
- private ExecutorService jobPool;
- private DefaultContext context;
-
- private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
- private volatile boolean initialized = false;
- private volatile boolean hasStarted = false;
- private JobEngineConfig jobEngineConfig;
-
- private static final DefaultScheduler INSTANCE = new DefaultScheduler();
-
- private DefaultScheduler() {
- }
-
- private class FetcherRunner implements Runnable {
-
- @Override
- synchronized public void run() {
- // logger.debug("Job Fetcher is running...");
- Map<String, Executable> runningJobs = context.getRunningJobs();
- if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
- logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
- return;
- }
-
- int nRunning = 0, nReady = 0, nOthers = 0;
- for (final String id : executableManager.getAllJobIds()) {
- if (runningJobs.containsKey(id)) {
- // logger.debug("Job id:" + id + " is already running");
- nRunning++;
- continue;
- }
- final Output output = executableManager.getOutput(id);
- if ((output.getState() != ExecutableState.READY)) {
- // logger.debug("Job id:" + id + " not runnable");
- nOthers++;
- continue;
- }
- nReady++;
- AbstractExecutable executable = executableManager.getJob(id);
- String jobDesc = executable.toString();
- logger.info(jobDesc + " prepare to schedule");
- try {
- context.addRunningJob(executable);
- jobPool.execute(new JobRunner(executable));
- logger.info(jobDesc + " scheduled");
- } catch (Exception ex) {
- context.removeRunningJob(executable);
- logger.warn(jobDesc + " fail to schedule", ex);
- }
- }
- logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others");
- }
- }
-
- private class JobRunner implements Runnable {
-
- private final AbstractExecutable executable;
-
- public JobRunner(AbstractExecutable executable) {
- this.executable = executable;
- }
-
- @Override
- public void run() {
- try {
- executable.execute(context);
- // trigger the next step asap
- fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
- } catch (ExecuteException e) {
- logger.error("ExecuteException job:" + executable.getId(), e);
- } catch (Exception e) {
- logger.error("unknown error execute job:" + executable.getId(), e);
- } finally {
- context.removeRunningJob(executable);
- }
- }
- }
-
- public static DefaultScheduler getInstance() {
- return INSTANCE;
- }
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
- try {
- shutdown();
- } catch (SchedulerException e) {
- throw new RuntimeException("failed to shutdown scheduler", e);
- }
- }
- }
-
- @Override
- public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
- if (!initialized) {
- initialized = true;
- } else {
- return;
- }
-
- this.jobEngineConfig = jobEngineConfig;
-
- if (jobLock.lock() == false) {
- throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
- }
-
- executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
- //load all executable, set them to a consistent status
- fetcherPool = Executors.newScheduledThreadPool(1);
- int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
- jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
- context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
-
- for (AbstractExecutable executable : executableManager.getAllExecutables()) {
- if (executable.getStatus() == ExecutableState.READY) {
- executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
- }
- }
- executableManager.updateAllRunningJobsToError();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- logger.debug("Closing zk connection");
- try {
- shutdown();
- jobLock.unlock();
- } catch (SchedulerException e) {
- logger.error("error shutdown scheduler", e);
- }
- }
- });
-
- fetcher = new FetcherRunner();
- fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
- hasStarted = true;
- }
-
- @Override
- public void shutdown() throws SchedulerException {
- fetcherPool.shutdown();
- jobPool.shutdown();
- }
-
- @Override
- public boolean stop(AbstractExecutable executable) throws SchedulerException {
- if (hasStarted) {
- return true;
- } else {
- //TODO should try to stop this executable
- return true;
- }
- }
-
- public boolean hasStarted() {
- return this.hasStarted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
deleted file mode 100644
index 2a72064..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An interface alike abstract class. Hold common tunable parameters and nothing more.
- */
-abstract public class AbstractInMemCubeBuilder {
-
- private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
-
- final protected CubeDesc cubeDesc;
- final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
-
- protected int taskThreadCount = 4;
- protected int reserveMemoryMB = 100;
-
- public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
- if(cubeDesc == null)
- throw new NullPointerException();
- if (dictionaryMap == null)
- throw new IllegalArgumentException("dictionary cannot be null");
-
- this.cubeDesc = cubeDesc;
- this.dictionaryMap = dictionaryMap;
- }
-
- public void setConcurrentThreads(int n) {
- this.taskThreadCount = n;
- }
-
- public void setReserveMemoryMB(int mb) {
- this.reserveMemoryMB = mb;
- }
-
- public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- build(input, output);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
-
- protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
- long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- output.write(cuboidId, record);
- }
- scanner.close();
- logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
- }
-
-
-}