You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:48 UTC
[09/51] [partial] kylin git commit: KYLIN-1416 keep only website in
document branch
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
deleted file mode 100644
index 56c9659..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_TABLE_NAME);
- parseOptions(options, args);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- Configuration jobConf = job.getConfiguration();
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-
- // ----------------------------------------------------------------------------
- // add metadata to distributed cache
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-
- jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
- System.out.println("Starting: " + job.getJobName());
-
- setJobClasspath(job);
-
- setupMapper(intermediateTable);
- setupReducer(output);
-
- // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
- attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
-
- return waitForCompletion(job);
-
- } catch (Exception e) {
- logger.error("error in FactDistinctColumnsJob", e);
- printUsage(options);
- throw e;
- } finally {
- if (job != null) {
- cleanupTempConfFile(job.getConfiguration());
- }
- }
-
- }
-
- private void setupMapper(String intermediateTable) throws IOException {
- // FileInputFormat.setInputPaths(job, input);
-
- String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
- HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
- job.setInputFormatClass(HCatInputFormat.class);
- job.setMapperClass(FactDistinctColumnsMapper.class);
- job.setCombinerClass(FactDistinctColumnsCombiner.class);
- job.setMapOutputKeyClass(ShortWritable.class);
- job.setMapOutputValueClass(Text.class);
- }
-
- private void setupReducer(Path output) throws IOException {
- job.setReducerClass(FactDistinctColumnsReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
- job.setNumReduceTasks(1);
-
- deletePath(job.getConfiguration(), output);
- }
-
- public static void main(String[] args) throws Exception {
- FactDistinctColumnsJob job = new FactDistinctColumnsJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 72802aa..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
-
- private String cubeName;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private int[] factDictCols;
-
- private CubeJoinedFlatTableDesc intermediateTableDesc;
-
- private ShortWritable outputKey = new ShortWritable();
- private Text outputValue = new Text();
- private int errorRecordCounter;
-
- private HCatSchema schema = null;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- Configuration conf = context.getConfiguration();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- List<TblColRef> columns = baseCuboid.getColumns();
-
- ArrayList<Integer> factDictCols = new ArrayList<Integer>();
- RowKeyDesc rowkey = cubeDesc.getRowkey();
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- for (int i = 0; i < columns.size(); i++) {
- TblColRef col = columns.get(i);
- if (rowkey.isUseDictionary(col) == false)
- continue;
-
- String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (cubeDesc.getModel().isFactTable(scanTable)) {
- factDictCols.add(i);
- }
- }
- this.factDictCols = new int[factDictCols.size()];
- for (int i = 0; i < factDictCols.size(); i++)
- this.factDictCols[i] = factDictCols.get(i);
-
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
- }
-
- @Override
- public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
- try {
-
- int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- HCatFieldSchema fieldSchema = null;
- for (int i : factDictCols) {
- outputKey.set((short) i);
- fieldSchema = schema.get(flatTableIndexes[i]);
- Object fieldValue = record.get(fieldSchema.getName(), schema);
- if (fieldValue == null)
- continue;
- byte[] bytes = Bytes.toBytes(fieldValue.toString());
- outputValue.set(bytes, 0, bytes.length);
- context.write(outputKey, outputValue);
- }
- } catch (Exception ex) {
- handleErrorRecord(record, ex);
- }
-
- }
-
- private void handleErrorRecord(HCatRecord record, Exception ex) throws IOException {
-
- System.err.println("Insane record: " + record.getAll());
- ex.printStackTrace(System.err);
-
- errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
- if (ex instanceof IOException)
- throw (IOException) ex;
- else if (ex instanceof RuntimeException)
- throw (RuntimeException) ex;
- else
- throw new RuntimeException("", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
deleted file mode 100644
index 89f90ba..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
- private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- columnList = baseCuboid.getColumns();
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- TblColRef col = columnList.get(key.get());
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
- try {
- for (ByteArray value : set) {
- out.write(value.data);
- out.write('\n');
- }
- } finally {
- out.close();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
deleted file mode 100644
index befa16f..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import org.apache.commons.cli.Options;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-//import org.apache.hadoop.util.ToolRunner;
-//
-//import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeManager;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-//import org.apache.kylin.cube.kv.RowKeyEncoder;
-//import org.apache.kylin.index.AbstractHadoopJob;
-//import org.apache.kylin.metadata.model.cube.CubeDesc;
-//
-///**
-// * @author xjiang
-// *
-// */
-//
-//public class KeyDistributionJob extends AbstractHadoopJob {
-//
-// public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
-// public static final String KEY_HEADER_LENGTH = "key_header_length";
-// public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
-// public static final String KEY_SPLIT_NUMBER = "key_split_number";
-//
-// /* (non-Javadoc)
-// * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-// */
-// @Override
-// public int run(String[] args) throws Exception {
-// Options options = new Options();
-//
-// try {
-// options.addOption(OPTION_INPUT_PATH);
-// options.addOption(OPTION_OUTPUT_PATH);
-// options.addOption(OPTION_METADATA_URL);
-// options.addOption(OPTION_CUBE_NAME);
-// options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
-// options.addOption(OPTION_KEY_SPLIT_NUMBER);
-// parseOptions(options, args);
-//
-// // start job
-// String jobName = JOB_TITLE + getOptionsAsString();
-// System.out.println("Starting: " + jobName);
-// Job job = Job.getInstanceFromEnv(getConf(), jobName);
-//
-// // set job configuration - basic
-// setJobClasspath(job);
-// addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-//
-// Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-// FileOutputFormat.setOutputPath(job, output);
-// //job.getConfiguration().set("dfs.block.size", "67108864");
-//
-// // set job configuration - key prefix size & key split number
-// String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
-// job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
-// String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
-// String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-// int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
-// job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
-// job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
-//
-// // Mapper
-// job.setInputFormatClass(SequenceFileInputFormat.class);
-// job.setMapperClass(KeyDistributionMapper.class);
-// job.setMapOutputKeyClass(Text.class);
-// job.setMapOutputValueClass(LongWritable.class);
-//
-// // Combiner, not needed any more as mapper now does the groping
-// //job.setCombinerClass(KeyDistributionCombiner.class);
-//
-// // Reducer - only one
-// job.setReducerClass(KeyDistributionReducer.class);
-// // use sequence file as output
-// job.setOutputFormatClass(SequenceFileOutputFormat.class);
-// // key is text
-// job.setOutputKeyClass(Text.class);
-// // value is long
-// job.setOutputValueClass(LongWritable.class);
-// job.setNumReduceTasks(1);
-//
-// FileSystem fs = FileSystem.get(job.getConfiguration());
-// if (fs.exists(output))
-// fs.delete(output, true);
-//
-// return waitForCompletion(job);
-// } catch (Exception e) {
-// printUsage(options);
-// e.printStackTrace(System.err);
-// return 2;
-// }
-// }
-//
-// private int getKeyHeaderLength(String metadataUrl, String cubeName) {
-// CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
-// CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-// CubeDesc cubeDesc = cubeInstance.getDescriptor();
-// long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-// Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-// RowKeyEncoder rowKeyEncoder =
-// (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
-// baseCuboid);
-//
-// return rowKeyEncoder.getHeaderLength();
-//
-// }
-//
-// public static void main(String[] args) throws Exception {
-// int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
-// System.exit(exitCode);
-// }
-// }
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
deleted file mode 100644
index f145bde..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Mapper;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-//
-// private int headerLength;
-//
-// private Text currentKey;
-// private long outputLong;
-// private Text outputKey;
-// private LongWritable outputValue;
-// private int columnPercentage;
-// private int allRowCount;
-//
-// @Override
-// protected void setup(Context context) throws IOException {
-//super.publishConfiguration(context.getConfiguration());
-
-// String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
-// this.columnPercentage = Integer.valueOf(percentStr).intValue();
-// if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
-// this.columnPercentage = 20;
-// }
-// String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
-// this.headerLength = Integer.valueOf(headerLenStr).intValue();
-//
-// currentKey = new Text();
-// outputLong = 0;
-// outputKey = new Text();
-// outputValue = new LongWritable(1);
-// allRowCount = 0;
-// }
-//
-// @Override
-// protected void cleanup(Context context) throws IOException, InterruptedException {
-// emit(context); // emit the last holding record
-//
-// byte[] zerokey = new byte[] { 0 };
-// outputKey.set(zerokey);
-// outputValue.set(allRowCount);
-// context.write(outputKey, outputValue);
-// }
-//
-// @Override
-// public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-// byte[] bytes = key.getBytes();
-// int columnLength = bytes.length - this.headerLength;
-// int columnPrefixLen = columnLength * this.columnPercentage / 100;
-// if (columnPrefixLen == 0 && columnLength > 0) {
-// columnPrefixLen = 1;
-// }
-// if (columnPrefixLen > 0) {
-// currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
-// } else {
-// currentKey.set(bytes);
-// }
-//
-// allRowCount++;
-//
-// if (outputKey.getLength() == 0) { // first record
-// outputKey.set(currentKey);
-// outputLong = 1;
-// } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
-// outputLong++;
-// } else { // the next key
-// emit(context);
-// outputKey.set(currentKey);
-// outputLong = 1;
-// }
-// }
-//
-// private void emit(Context context) throws IOException, InterruptedException {
-// if (outputLong == 0)
-// return;
-//
-// outputValue.set(outputLong);
-// context.write(outputKey, outputValue);
-// }
-// }
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
deleted file mode 100644
index dd02910..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.util.StringUtils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-//
-// private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
-//
-// private LongWritable outputValue;
-// private boolean isTotalCount;
-// private long totalCount;
-// private int splitNumber;
-// private long splitQuota;
-// private long splitRemain;
-//
-// @Override
-// protected void setup(Context context) throws IOException, InterruptedException {
-// super.publishConfiguration(context.getConfiguration());
-
-// String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
-// splitNumber = Integer.valueOf(splitStr).intValue();
-// outputValue = new LongWritable();
-// isTotalCount = true;
-// totalCount = 0;
-// splitQuota = 0;
-// splitRemain = 0;
-// }
-//
-// @Override
-// protected void cleanup(Context context) throws IOException, InterruptedException {
-// logger.info("---------------");
-// long splitCount = splitQuota - splitRemain;
-// logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
-// }
-//
-// @Override
-// public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
-// InterruptedException {
-//
-// // calculate split quota
-// if (isTotalCount) {
-// for (LongWritable count : values) {
-// totalCount += count.get();
-// }
-// splitQuota = totalCount / splitNumber;
-// splitRemain = splitQuota;
-// isTotalCount = false;
-// return;
-// }
-//
-// // output key when split quota is used up
-// for (LongWritable count : values) {
-// splitRemain -= count.get();
-// }
-// if (splitRemain <= 0) {
-// long splitCount = splitQuota - splitRemain;
-// String hexKey = StringUtils.byteToHexString(key.getBytes());
-// logger.info(hexKey + "\t\t" + splitCount);
-//
-// outputValue.set(splitCount);
-// context.write(key, outputValue);
-// splitRemain = splitQuota;
-// }
-//
-// }
-// }
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
deleted file mode 100644
index 73faa6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- */
-public class MergeCuboidJob extends CuboidJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- System.out.println("Starting: " + jobName);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- // set inputs
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(MergeCuboidMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
-
- // Reducer - only one
- job.setReducerClass(CuboidReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- setReduceTaskNum(job, config, cubeName, 0);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in MergeCuboidJob", e);
- printUsage(options);
- throw e;
- } finally {
- if (job != null)
- cleanupTempConfFile(job.getConfiguration());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
deleted file mode 100644
index 2528e07..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * @author ysong1, honma
- */
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
- private KylinConfig config;
- private String cubeName;
- private String segmentName;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
- // life cycle
-
-
- // for re-encode measures that use dictionary
- private List<Pair<Integer, MeasureIngester>> dictMeasures;
- private Map<TblColRef, Dictionary<String>> oldDicts;
- private Map<TblColRef, Dictionary<String>> newDicts;
- private List<MeasureDesc> measureDescs;
- private MeasureCodec codec;
- private Object[] measureObjs;
- private ByteBuffer valueBuf;
- private Text outputValue;
-
- private Text outputKey = new Text();
-
- private byte[] newKeyBuf;
- private RowKeySplitter rowKeySplitter;
-
- private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
- private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dictsNeedMerging.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col);
- if (ret) {
- String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
- }
- dictsNeedMerging.put(col, ret);
- return ret;
- }
- }
-
- private String extractJobIDFromPath(String path) {
- Matcher matcher = JOB_NAME_PATTERN.matcher(path);
- // check the first occurance
- if (matcher.find()) {
- return matcher.group(1);
- } else {
- throw new IllegalStateException("Can not extract job ID from file path : " + path);
- }
- }
-
- private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String lastBuildJobID = segment.getLastBuildJobID();
- if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
- return segment;
- }
- }
-
- throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.publishConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- newKeyBuf = new byte[256];// size will auto-grow
-
- // decide which source segment
- InputSplit inputSplit = context.getInputSplit();
- String filePath = ((FileSplit) inputSplit).getPath().toString();
- System.out.println("filePath:" + filePath);
- String jobID = extractJobIDFromPath(filePath);
- System.out.println("jobID:" + jobID);
- sourceCubeSegment = findSegmentWithUuid(jobID, cube);
- System.out.println(sourceCubeSegment);
-
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-
-
- measureDescs = cubeDesc.getMeasures();
- codec = new MeasureCodec(measureDescs);
- measureObjs = new Object[measureDescs.size()];
- valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- outputValue = new Text();
-
- dictMeasures = Lists.newArrayList();
- for (int i = 0; i < measureDescs.size(); i++) {
- MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = measureDesc.getFunction().getMeasureType();
- if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
- dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
- }
- }
- if (dictMeasures.size() > 0) {
- oldDicts = sourceCubeSegment.buildDictionaryMap();
- newDicts = mergedCubeSegment.buildDictionaryMap();
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
- int bufOffset = 0;
- BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
- bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- for (int i = 0; i < cuboid.getColumns().size(); ++i) {
- TblColRef col = cuboid.getColumns().get(i);
-
- if (this.checkNeedMerging(col)) {
- // if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
- while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
- int idInMergedDict;
-
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- } else {
- // keep as it is
- while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
- bufOffset += splittedByteses[i + 1].length;
- }
- }
- byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
- outputKey.set(newKey, 0, newKey.length);
-
-
- // re-encode measures if dictionary is used
- if (dictMeasures.size() > 0) {
- codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
- for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
- int i = pair.getFirst();
- MeasureIngester ingester = pair.getSecond();
- measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
- }
- valueBuf.clear();
- codec.encode(measureObjs, valueBuf);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- value = outputValue;
- }
-
- context.write(outputKey, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
deleted file mode 100644
index e46093c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-public class MetadataCleanupJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete");
-
- protected static final Logger log = LoggerFactory.getLogger(MetadataCleanupJob.class);
-
- boolean delete = false;
-
- private KylinConfig config = null;
-
- public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
- public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- log.info("----- jobs args: " + Arrays.toString(args));
- try {
- options.addOption(OPTION_DELETE);
- parseOptions(options, args);
-
- log.info("options: '" + getOptionsAsString() + "'");
- log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
- delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
- config = KylinConfig.getInstanceFromEnv();
-
- cleanup();
-
- return 0;
- } catch (Exception e) {
- e.printStackTrace(System.err);
- throw e;
- }
- }
-
- private ResourceStore getStore() {
- return ResourceStore.getStore(config);
- }
-
- private boolean isOlderThanThreshold(long resourceTime) {
- long currentTime = System.currentTimeMillis();
-
- if (currentTime - resourceTime > TIME_THREADSHOLD)
- return true;
- return false;
- }
-
- public void cleanup() throws Exception {
- CubeManager cubeManager = CubeManager.getInstance(config);
-
- Set<String> activeResourceList = Sets.newHashSet();
- for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) {
- for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) {
- activeResourceList.addAll(segment.getSnapshotPaths());
- activeResourceList.addAll(segment.getDictionaryPaths());
- }
- }
-
- List<String> toDeleteResource = Lists.newArrayList();
-
- // two level resources, snapshot tables and cube statistics
- for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT }) {
- ArrayList<String> snapshotTables = getStore().listResources(resourceRoot);
-
- if (snapshotTables != null) {
- for (String snapshotTable : snapshotTables) {
- ArrayList<String> snapshotNames = getStore().listResources(snapshotTable);
- if (snapshotNames != null)
- for (String snapshot : snapshotNames) {
- if (!activeResourceList.contains(snapshot)) {
- long ts = getStore().getResourceTimestamp(snapshot);
- if (isOlderThanThreshold(ts))
- toDeleteResource.add(snapshot);
- }
- }
- }
- }
- }
-
- // three level resources, only dictionaries
- ArrayList<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT);
-
- if (dictTables != null) {
- for (String table : dictTables) {
- ArrayList<String> tableColNames = getStore().listResources(table);
- if (tableColNames != null)
- for (String tableCol : tableColNames) {
- ArrayList<String> dictionaries = getStore().listResources(tableCol);
- if (dictionaries != null)
- for (String dict : dictionaries)
- if (!activeResourceList.contains(dict)) {
- long ts = getStore().getResourceTimestamp(dict);
- if (isOlderThanThreshold(ts))
- toDeleteResource.add(dict);
- }
- }
- }
- }
-
- // delete old and completed jobs
- ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
- List<ExecutablePO> allExecutable = executableDao.getJobs();
- for (ExecutablePO executable : allExecutable) {
- long lastModified = executable.getLastModified();
- ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
- if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) {
- toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + executable.getUuid());
- toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + executable.getUuid());
-
- for (ExecutablePO task : executable.getTasks()) {
- toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + task.getUuid());
- toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + task.getUuid());
- }
- }
- }
-
- if (toDeleteResource.size() > 0) {
- logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
-
- for (String s : toDeleteResource) {
- logger.info(s);
- if (delete == true) {
- getStore().deleteResource(s);
- }
- }
- } else {
- logger.info("No resource to be cleaned up from metadata store;");
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
deleted file mode 100644
index feb4dc4..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-public class NDCuboidJob extends CuboidJob {
-
- public NDCuboidJob() {
- this.setMapperClass(NDCuboidMapper.class);
- }
-
- public static void main(String[] args) throws Exception {
- CuboidJob job = new NDCuboidJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
deleted file mode 100644
index 8725ed8..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class);
-
- private Text outputKey = new Text();
- private String cubeName;
- private String segmentName;
- private CubeDesc cubeDesc;
- private CuboidScheduler cuboidScheduler;
-
- private int handleCounter;
- private int skipCounter;
-
- private byte[] keyBuf = new byte[4096];
- private RowKeySplitter rowKeySplitter;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- cubeDesc = cube.getDescriptor();
-
- // initialize CubiodScheduler
- cuboidScheduler = new CuboidScheduler(cubeDesc);
-
- rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
- }
-
- private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
- int offset = 0;
-
- // cuboid id
- System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
- offset += childCuboid.getBytes().length;
-
- // rowkey columns
- long mask = Long.highestOneBit(parentCuboid.getId());
- long parentCuboidId = parentCuboid.getId();
- long childCuboidId = childCuboid.getId();
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
- int index = 1; // skip cuboidId
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parentCuboidId) > 0) {// if the this bit position equals
- // 1
- if ((mask & childCuboidId) > 0) {// if the child cuboid has this
- // column
- System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length);
- offset += splitBuffers[index].length;
- }
- index++;
- }
- mask = mask >> 1;
- }
-
- return offset;
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
- Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
- Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
-
- // if still empty or null
- if (myChildren == null || myChildren.size() == 0) {
- context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
- skipCounter++;
- if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Skipped " + skipCounter + " records!");
- }
- return;
- }
-
- context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
-
- handleCounter++;
- if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + handleCounter + " records!");
- }
-
- for (Long child : myChildren) {
- Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
- int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
- outputKey.set(keyBuf, 0, keyLength);
- context.write(outputKey, value);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
deleted file mode 100644
index 9c50122..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang, ysong1
- *
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
-
- parseOptions(options, args);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- // job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RangeKeyDistributionMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RangeKeyDistributionReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubeName);
- DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
- int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
- int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
- int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
- job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
deleted file mode 100644
index 33baf45..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinMapper;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
- private LongWritable outputValue = new LongWritable(0);
-
- private long bytesRead = 0;
-
- private Text lastKey;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- lastKey = key;
-
- int bytesLength = key.getLength() + value.getLength();
- bytesRead += bytesLength;
-
- if (bytesRead >= ONE_MEGA_BYTES) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
-
- // reset bytesRead
- bytesRead = 0;
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
deleted file mode 100644
index b3ab4db..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
-
- private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
- private LongWritable outputValue = new LongWritable(0);
-
- private int minRegionCount = 1;
- private int maxRegionCount = 500;
- private int cut = 10;
- private long bytesRead = 0;
- private List<Text> gbPoints = new ArrayList<Text>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
- cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
- }
-
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
- minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
- }
-
- if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
- maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
- }
-
- logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- for (LongWritable v : values) {
- bytesRead += v.get();
- }
-
- if (bytesRead >= ONE_GIGA_BYTES) {
- gbPoints.add(new Text(key));
- bytesRead = 0; // reset bytesRead
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- int nRegion = Math.round((float) gbPoints.size() / (float) cut);
- nRegion = Math.max(minRegionCount, nRegion);
- nRegion = Math.min(maxRegionCount, nRegion);
-
- int gbPerRegion = gbPoints.size() / nRegion;
- gbPerRegion = Math.max(1, gbPerRegion);
-
- System.out.println(nRegion + " regions");
- System.out.println(gbPerRegion + " GB per region");
-
- for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
- Text key = gbPoints.get(i);
- outputValue.set(i);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- context.write(key, outputValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index 9372f18..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(rowKeyStatsFilePath);
-
- parseOptions(options, args);
-
- String statsFilePath = getOptionValue(rowKeyStatsFilePath);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RowKeyDistributionCheckerMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RowKeyDistributionCheckerReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index 50b0499..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.mr.KylinMapper;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- String rowKeyStatsFilePath;
- byte[][] splitKeys;
- Map<Text, Long> resultMap;
- List<Text> keyList;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
- splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
- resultMap = new HashMap<Text, Long>();
- keyList = new ArrayList<Text>();
- for (int i = 0; i < splitKeys.length; i++) {
- Text key = new Text(splitKeys[i]);
- resultMap.put(key, 0L);
- keyList.add(new Text(splitKeys[i]));
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- for (Text t : keyList) {
- if (key.compareTo(t) < 0) {
- Long v = resultMap.get(t);
- long length = key.getLength() + value.getLength();
- v += length;
- resultMap.put(t, v);
- break;
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- LongWritable outputValue = new LongWritable();
- for (Entry<Text, Long> kv : resultMap.entrySet()) {
- outputValue.set(kv.getValue());
- context.write(kv.getKey(), outputValue);
- }
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) {
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeStream(reader);
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
- return retValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index 83e503e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinReducer;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- LongWritable outputKey = new LongWritable(0L);
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
- long length = 0;
- for (LongWritable v : values) {
- length += v.get();
- }
-
- outputKey.set(length);
- context.write(key, outputKey);
- }
-}