You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/10 07:48:13 UTC
[20/54] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index 06c98ee..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.hll.HyperLogLogPlusCounter;
-import com.kylinolap.cube.kv.RowConstants;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityReducer extends Reducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
- public static final int ONE = 1;
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
- @Override
- public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
- for (BytesWritable v : values) {
- int skey = key.get();
- ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
- hll.readRegisters(buffer);
- getHllc(skey).merge(hll);
- hll.clear();
- }
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- List<Integer> keys = new ArrayList<Integer>();
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- keys.add(it.next());
- }
- Collections.sort(keys);
- it = keys.iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
- // context.write(new Text("ErrorRate_" + key), new
- // LongWritable((long)hllc.getErrorRate()));
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index b6ea002..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
- public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_FORMAT = OptionBuilder.withArgName("input format").hasArg().isRequired(true).withDescription("The file format").create("iformat");
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("input_dilim").hasArg().isRequired(false).withDescription("Input delim").create("idelim");
-
- public static final String KEY_INPUT_DELIM = "INPUT_DELIM";
- public static final String OUTPUT_PATH = "/tmp/cardinality";
-
- /**
- * This is the jar path
- */
- private String jarPath;
- private Configuration conf;
-
- /**
- * MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY
- */
- private String tokenPath;
-
- public HiveColumnCardinalityJob() {
-
- }
-
- public HiveColumnCardinalityJob(String path, String tokenPath) {
- this.jarPath = path;
- this.tokenPath = tokenPath;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.conf.Configured#getConf()
- */
- @Override
- public Configuration getConf() {
- if (conf != null) {
- return conf;
- }
- conf = new JobConf();
- String path = "/apache/hadoop/conf/";
- File file = new File(path);
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- for (int i = 0; i < files.length; i++) {
- File tmp = files[i];
- if (tmp.getName().endsWith(".xml")) {
- try {
- conf.addResource(new FileInputStream(tmp));
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- // conf.addResource("/apache/hadoop/conf/mapred-site.xml");
- if (tokenPath != null) {
- conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenPath);
- conf.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(conf);
- try {
- UserGroupInformation.loginUserFromKeytab("b_kylin@CORP.EBAY.COM", "~/.keytabs/b_kylin.keytab");
- System.out.println("###" + UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return conf;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int run(String[] args) throws Exception {
-
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_FORMAT);
- options.addOption(OPTION_INPUT_DELIM);
-
- parseOptions(options, args);
-
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- System.out.println("Starting: " + jobName);
- Configuration conf = getConf();
- job = Job.getInstance(conf, jobName);
-
- // set job configuration - basic
- if (jarPath == null || !new File(jarPath).exists()) {
- job.setJarByClass(getClass());
- } else {
- job.setJar(jarPath);
- }
- FileInputFormat.setInputDirRecursive(job, true);
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set("dfs.block.size", "67108864");
-
- String format = getOptionValue(OPTION_FORMAT);
- @SuppressWarnings("rawtypes")
- Class cformat = getFormat(format);
- String delim = getOptionValue(OPTION_INPUT_DELIM);
- if (delim != null) {
- if (delim.equals("t")) {
- delim = "\t";
- } else if (delim.equals("177")) {
- delim = "\177";
- }
- job.getConfiguration().set(KEY_INPUT_DELIM, delim);
- }
-
- // Mapper
- job.setInputFormatClass(cformat);
- job.setMapperClass(ColumnCardinalityMapper.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(BytesWritable.class);
-
- // Reducer - only one
- job.setReducerClass(ColumnCardinalityReducer.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- int result = waitForCompletion(job);
- return result;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
-
- }
-
- /**
- * @param format
- * @throws ClassNotFoundException
- */
- @SuppressWarnings("rawtypes")
- private Class getFormat(String format) throws ClassNotFoundException {
- if (format.endsWith(".TextInputFormat")) {
- return Class.forName("org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
- } else if (format.endsWith(".SequenceFileInputFormat")) {
- return Class.forName("org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat");
- } else {
- return Class.forName(format);
- }
-
- }
-
- public static void main1(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new HiveColumnCardinalityJob(), args);
- System.exit(exitCode);
- }
-
- public static void main(String[] args) {
-
- String location = "hdfs://apollo-phx-nn.vip.ebay.com:8020/tmp/f1a98d8a-26b9-452e-ab7b-9f01e5a6459b/shipping_sisense_cube_desc_intermediate_table";
- String tempName = "test";
- String inputFormat = "org.apache.hadoop.mapred.SequenceFileInputFormat";
- String delim = "177";
- String jarPath = "/usr/lib/kylin/kylin-index-latest.jar";
-
- args = new String[] { "-input", location, "-output", "/tmp/cardinality/" + tempName, "-iformat", inputFormat, "-idelim", delim };
- HiveColumnCardinalityJob job = new HiveColumnCardinalityJob(jarPath, "/tmp/krb5cc_882");
- try {
- ToolRunner.run(job, args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public List<String> readLines(Path location, Configuration conf) throws Exception {
- FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
- CompressionCodecFactory factory = new CompressionCodecFactory(conf);
- FileStatus[] items = fileSystem.listStatus(location);
- if (items == null)
- return new ArrayList<String>();
- List<String> results = new ArrayList<String>();
- for (FileStatus item : items) {
-
- // ignoring files like _SUCCESS
- if (item.getPath().getName().startsWith("_")) {
- continue;
- }
-
- CompressionCodec codec = factory.getCodec(item.getPath());
- InputStream stream = null;
-
- // check if we have a compression codec we need to use
- if (codec != null) {
- stream = codec.createInputStream(fileSystem.open(item.getPath()));
- } else {
- stream = fileSystem.open(item.getPath());
- }
-
- StringWriter writer = new StringWriter();
- IOUtils.copy(stream, writer, "UTF-8");
- String raw = writer.toString();
- for (String str : raw.split("\n")) {
- results.add(str);
- }
- }
- return results;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
deleted file mode 100644
index cc25b13..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author honma
- *
- */
-
-public class BaseCuboidJob extends CuboidJob {
- public BaseCuboidJob() {
- this.setMapperClass(BaseCuboidMapper.class);
- }
-
- public static void main(String[] args) throws Exception {
- CuboidJob job = new BaseCuboidJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index 39b3918..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.FunctionDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.ParameterDesc;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends Mapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
- public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
- public static final byte[] ONE = Bytes.toBytes("1");
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment cubeSegment;
- private List<byte[]> nullBytes;
-
- private JoinedFlatTableDesc intermediateTableDesc;
- private String intermediateTableRowDelimiter;
- private byte byteRowDelimiter;
-
- private int counter;
- private int errorRecordCounter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- @Override
- protected void setup(Context context) throws IOException {
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
- if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
- throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
- }
-
- byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- intermediateTableDesc = new JoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
- bytesSplitter = new BytesSplitter(200, 4096);
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- initNullBytes();
- }
-
- private void initNullBytes() {
- nullBytes = Lists.newArrayList();
- nullBytes.add(HIVE_NULL);
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullBytes.add(Bytes.toBytes(s));
- }
- }
- }
-
- private boolean isNull(byte[] v) {
- for (byte[] nullByte : nullBytes) {
- if (Bytes.equals(v, nullByte))
- return true;
- }
- return false;
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
- int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
- int index = rowKeyColumnIndexes[i];
- keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
- if (isNull(keyBytesBuf[i])) {
- keyBytesBuf[i] = null;
- }
- }
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = desc.getFunction();
- ParameterDesc paramDesc = func.getParameter();
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- // for multiple columns, their values are joined
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- if (result == null) {
- result = Arrays.copyOf(split.value, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split.value, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- }
-
- if (func.isCount() || func.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- result = ONE;
- }
-
- if (isNull(result)) {
- result = null;
- }
-
- return result;
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
- try {
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
- } catch (Exception ex) {
- handleErrorRecord(bytesSplitter.getSplitBuffers(), ex);
- }
- }
-
- private void handleErrorRecord(SplittedBytes[] splitBuffers, Exception ex) throws IOException {
-
- StringBuilder buf = new StringBuilder();
- buf.append("Error record: [");
- for (int i = 0; i < splitBuffers.length; i++) {
- if (i > 0)
- buf.append(", ");
-
- buf.append(Bytes.toString(splitBuffers[i].value, 0, splitBuffers[i].length));
- }
- buf.append("] -- ");
- buf.append(ex.toString());
- System.err.println(buf.toString());
- ex.printStackTrace(System.err);
-
- errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
- if (ex instanceof IOException)
- throw (IOException) ex;
- else if (ex instanceof RuntimeException)
- throw (RuntimeException) ex;
- else
- throw new RuntimeException("", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
deleted file mode 100644
index 1f939d1..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-public class CubeHFileJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(CubeHFileJob.class);
-
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- CubeInstance cube = cubeMgr.getCube(cubeName);
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(CubeHFileMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- Configuration conf = HBaseConfiguration.create(getConf());
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTable htable = new HTable(conf, tableName);
-
- //Automatic config !
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- // set block replication to 3 for hfiles
- conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CubeHFileJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
deleted file mode 100644
index 4f9b5c8..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.google.common.collect.Lists;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class CubeHFileMapper extends Mapper<Text, Text, ImmutableBytesWritable, KeyValue> {
-
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
- String cubeName;
- CubeDesc cubeDesc;
-
- MeasureCodec inputCodec;
- Object[] inputMeasures;
- List<KeyValueCreator> keyValueCreators;
-
- @Override
- protected void setup(Context context) throws IOException {
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- CubeManager cubeMgr = CubeManager.getInstance(config);
- cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
-
- inputCodec = new MeasureCodec(cubeDesc.getMeasures());
- inputMeasures = new Object[cubeDesc.getMeasures().size()];
- keyValueCreators = Lists.newArrayList();
-
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
- }
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- outputKey.set(key.getBytes(), 0, key.getLength());
- KeyValue outputValue;
-
- int n = keyValueCreators.size();
- if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
- // simple full copy
-
- outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
- context.write(outputKey, outputValue);
-
- } else { // normal (complex) case that distributes measures to multiple
- // HBase columns
-
- inputCodec.decode(value, inputMeasures);
-
- for (int i = 0; i < n; i++) {
- outputValue = keyValueCreators.get(i).create(key, inputMeasures);
- context.write(outputKey, outputValue);
- }
- }
- }
-
- class KeyValueCreator {
- byte[] cfBytes;
- byte[] qBytes;
- long timestamp;
-
- int[] refIndex;
- MeasureDesc[] refMeasures;
-
- MeasureCodec codec;
- Object[] colValues;
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- boolean isFullCopy;
-
- public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
-
- cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
- qBytes = Bytes.toBytes(colDesc.getQualifier());
- timestamp = System.currentTimeMillis();
-
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] measureNames = getMeasureNames(cubeDesc);
- String[] refs = colDesc.getMeasureRefs();
-
- refIndex = new int[refs.length];
- refMeasures = new MeasureDesc[refs.length];
- for (int i = 0; i < refs.length; i++) {
- refIndex[i] = indexOf(measureNames, refs[i]);
- refMeasures[i] = measures.get(refIndex[i]);
- }
-
- codec = new MeasureCodec(refMeasures);
- colValues = new Object[refs.length];
-
- isFullCopy = true;
- for (int i = 0; i < measures.size(); i++) {
- if (refIndex.length <= i || refIndex[i] != i)
- isFullCopy = false;
- }
- }
-
- public KeyValue create(Text key, Object[] measureValues) {
- for (int i = 0; i < colValues.length; i++) {
- colValues[i] = measureValues[refIndex[i]];
- }
-
- valueBuf.clear();
- codec.encode(colValues, valueBuf);
-
- return create(key, valueBuf.array(), 0, valueBuf.position());
- }
-
- public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
- return new KeyValue(key.getBytes(), 0, key.getLength(), //
- cfBytes, 0, cfBytes.length, //
- qBytes, 0, qBytes.length, //
- timestamp, Type.Put, //
- value, voffset, vlen);
- }
-
- private int indexOf(String[] measureNames, String ref) {
- for (int i = 0; i < measureNames.length; i++)
- if (measureNames[i].equalsIgnoreCase(ref))
- return i;
-
- throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
- }
-
- private String[] getMeasureNames(CubeDesc cubeDesc) {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] result = new String[measures.size()];
- for (int i = 0; i < measures.size(); i++)
- result[i] = measures.get(i).getName();
- return result;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
deleted file mode 100644
index 687fe10..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.cuboid.CuboidCLI;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-
-/**
- * @author ysong1
- */
-public class CuboidJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(CuboidJob.class);
- private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
-
- @SuppressWarnings("rawtypes")
- private Class<? extends Mapper> mapperClass;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_NCUBOID_LEVEL);
- options.addOption(OPTION_INPUT_FORMAT);
- parseOptions(options, args);
-
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- System.out.println("Starting: " + job.getJobName());
- FileInputFormat.setInputPaths(job, input);
-
- File jarFile = new File(config.getKylinJobJarPath());
- if (jarFile.exists()) {
- job.setJar(config.getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- // Mapper
- if (this.mapperClass == null) {
- throw new Exception("Mapper class is not set!");
- }
-
- boolean isInputTextFormat = false;
- if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
- isInputTextFormat = true;
- }
-
- if (isInputTextFormat) {
- job.setInputFormatClass(TextInputFormat.class);
-
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
- job.setMapperClass(this.mapperClass);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
-
- // Reducer
- job.setReducerClass(CuboidReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- setReduceTaskNum(job, config, cubeName, nCuboidLevel);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- Configuration jobConf = job.getConfiguration();
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
- CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
- double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
- // total map input MB
- double totalMapInputMB = this.getTotalMapInputMB();
-
- // output / input ratio
- int preLevelCuboids, thisLevelCuboids;
- if (level == 0) { // base cuboid
- preLevelCuboids = thisLevelCuboids = 1;
- } else { // n-cuboid
- int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
- preLevelCuboids = allLevelCount[level - 1];
- thisLevelCuboids = allLevelCount[level];
- }
-
- // total reduce input MB
- double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
-
- // adjust reducer number for cube which has DISTINCT_COUNT measures for
- // better performance
- if (cubeDesc.hasHolisticCountDistinctMeasures()) {
- numReduceTasks = numReduceTasks * 4;
- }
-
- // at least 1 reducer
- numReduceTasks = Math.max(1, numReduceTasks);
- // no more than 5000 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
-
- System.out.println("Having total map input MB " + Math.round(totalMapInputMB));
- System.out.println("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
- System.out.println("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
- System.out.println("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
- }
-
- /**
- * @param mapperClass
- * the mapperClass to set
- */
- @SuppressWarnings("rawtypes")
- public void setMapperClass(Class<? extends Mapper> mapperClass) {
- this.mapperClass = mapperClass;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
deleted file mode 100644
index 50af652..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureAggregators;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class CuboidReducer extends Reducer<Text, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
-
- private String cubeName;
- private CubeDesc cubeDesc;
- private List<MeasureDesc> measuresDescs;
-
- private MeasureCodec codec;
- private MeasureAggregators aggs;
-
- private int counter;
- private Object[] input;
- private Object[] result;
-
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
- measuresDescs = cubeDesc.getMeasures();
-
- codec = new MeasureCodec(measuresDescs);
- aggs = new MeasureAggregators(measuresDescs);
-
- input = new Object[measuresDescs.size()];
- result = new Object[measuresDescs.size()];
- }
-
- @Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- aggs.reset();
-
- for (Text value : values) {
- codec.decode(value, input);
- aggs.aggregate(input);
- }
- aggs.collectStates(result);
-
- valueBuf.clear();
- codec.encode(result, valueBuf);
-
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- context.write(key, outputValue);
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
deleted file mode 100644
index 916dd9e..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.util.ByteArray;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsCombiner extends Reducer<ShortWritable, Text, ShortWritable, Text> {
-
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- for (ByteArray value : set) {
- outputValue.set(value.data);
- context.write(key, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
deleted file mode 100644
index 556f690..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_INPUT_FORMAT);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
- String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- // ----------------------------------------------------------------------------
-
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- System.out.println("Starting: " + job.getJobName());
-
- setupMapInput(input, inputFormat);
- setupReduceOutput(output);
-
- // add metadata to distributed cache
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
- attachKylinPropsAndMetadata(cubeMgr.getCube(cubeName), job.getConfiguration());
-
- return waitForCompletion(job);
-
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
-
- }
-
- private void setupMapInput(Path input, String inputFormat) throws IOException {
- FileInputFormat.setInputPaths(job, input);
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- if ("text".equalsIgnoreCase(inputFormat) || "textinputformat".equalsIgnoreCase(inputFormat)) {
- job.setInputFormatClass(TextInputFormat.class);
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
- job.setMapperClass(FactDistinctColumnsMapper.class);
- job.setCombinerClass(FactDistinctColumnsCombiner.class);
- job.setMapOutputKeyClass(ShortWritable.class);
- job.setMapOutputValueClass(Text.class);
- }
-
- private void setupReduceOutput(Path output) throws IOException {
- job.setReducerClass(FactDistinctColumnsReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
- job.setNumReduceTasks(1);
-
- deletePath(job.getConfiguration(), output);
- }
-
- public static void main(String[] args) throws Exception {
- FactDistinctColumnsJob job = new FactDistinctColumnsJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 236658d..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.RowKeyDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends Mapper<KEYIN, Text, ShortWritable, Text> {
-
- private String cubeName;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private int[] factDictCols;
-
- private JoinedFlatTableDesc intermediateTableDesc;
- private String intermediateTableRowDelimiter;
- private byte byteRowDelimiter;
- private BytesSplitter bytesSplitter;
-
- private ShortWritable outputKey = new ShortWritable();
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- intermediateTableRowDelimiter = conf.get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
- byteRowDelimiter = intermediateTableRowDelimiter.getBytes("UTF-8")[0];
- bytesSplitter = new BytesSplitter(200, 4096);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- intermediateTableDesc = new JoinedFlatTableDesc(cubeDesc, null);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- List<TblColRef> columns = baseCuboid.getColumns();
-
- ArrayList<Integer> factDictCols = new ArrayList<Integer>();
- RowKeyDesc rowkey = cubeDesc.getRowkey();
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- for (int i = 0; i < columns.size(); i++) {
- TblColRef col = columns.get(i);
- if (rowkey.isUseDictionary(col) == false)
- continue;
-
- String scanTable = (String) dictMgr.decideSourceData(cubeDesc, col, null)[0];
- if (cubeDesc.isFactTable(scanTable)) {
- System.out.println(col + " -- " + i);
- factDictCols.add(i);
- }
- }
- this.factDictCols = new int[factDictCols.size()];
- for (int i = 0; i < factDictCols.size(); i++)
- this.factDictCols[i] = factDictCols.get(i);
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
- SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers();
-
- int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- for (int i : factDictCols) {
- outputKey.set((short) i);
- SplittedBytes bytes = splitBuffers[flatTableIndexes[i]];
- outputValue.set(bytes.value, 0, bytes.length);
- System.out.println(i + " -- " + outputValue);
- context.write(outputKey, outputValue);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
deleted file mode 100644
index e701781..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.ByteArray;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsReducer extends Reducer<ShortWritable, Text, NullWritable, Text> {
-
- private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- columnList = baseCuboid.getColumns();
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- TblColRef col = columnList.get(key.get());
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
- try {
- for (ByteArray value : set) {
- out.write(value.data);
- out.write('\n');
- }
- } finally {
- out.close();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
deleted file mode 100644
index 0c14abb..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import org.apache.commons.cli.Options;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-//import org.apache.hadoop.util.ToolRunner;
-//
-//import com.kylinolap.cube.CubeInstance;
-//import com.kylinolap.cube.CubeManager;
-//import com.kylinolap.cube.cuboid.Cuboid;
-//import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
-//import com.kylinolap.cube.kv.RowKeyEncoder;
-//import com.kylinolap.index.AbstractHadoopJob;
-//import com.kylinolap.metadata.model.cube.CubeDesc;
-//
-///**
-// * @author xjiang
-// *
-// */
-//
-//public class KeyDistributionJob extends AbstractHadoopJob {
-//
-// public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
-// public static final String KEY_HEADER_LENGTH = "key_header_length";
-// public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
-// public static final String KEY_SPLIT_NUMBER = "key_split_number";
-//
-// /* (non-Javadoc)
-// * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-// */
-// @Override
-// public int run(String[] args) throws Exception {
-// Options options = new Options();
-//
-// try {
-// options.addOption(OPTION_INPUT_PATH);
-// options.addOption(OPTION_OUTPUT_PATH);
-// options.addOption(OPTION_METADATA_URL);
-// options.addOption(OPTION_CUBE_NAME);
-// options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
-// options.addOption(OPTION_KEY_SPLIT_NUMBER);
-// parseOptions(options, args);
-//
-// // start job
-// String jobName = JOB_TITLE + getOptionsAsString();
-// System.out.println("Starting: " + jobName);
-// Job job = Job.getInstanceFromEnv(getConf(), jobName);
-//
-// // set job configuration - basic
-// job.setJarByClass(getClass());
-// addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-//
-// Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-// FileOutputFormat.setOutputPath(job, output);
-// //job.getConfiguration().set("dfs.block.size", "67108864");
-//
-// // set job configuration - key prefix size & key split number
-// String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
-// job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
-// String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
-// String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-// int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
-// job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
-// job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
-//
-// // Mapper
-// job.setInputFormatClass(SequenceFileInputFormat.class);
-// job.setMapperClass(KeyDistributionMapper.class);
-// job.setMapOutputKeyClass(Text.class);
-// job.setMapOutputValueClass(LongWritable.class);
-//
-// // Combiner, not needed any more as mapper now does the groping
-// //job.setCombinerClass(KeyDistributionCombiner.class);
-//
-// // Reducer - only one
-// job.setReducerClass(KeyDistributionReducer.class);
-// // use sequence file as output
-// job.setOutputFormatClass(SequenceFileOutputFormat.class);
-// // key is text
-// job.setOutputKeyClass(Text.class);
-// // value is long
-// job.setOutputValueClass(LongWritable.class);
-// job.setNumReduceTasks(1);
-//
-// FileSystem fs = FileSystem.get(job.getConfiguration());
-// if (fs.exists(output))
-// fs.delete(output, true);
-//
-// return waitForCompletion(job);
-// } catch (Exception e) {
-// printUsage(options);
-// e.printStackTrace(System.err);
-// return 2;
-// }
-// }
-//
-// private int getKeyHeaderLength(String metadataUrl, String cubeName) {
-// CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
-// CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-// CubeDesc cubeDesc = cubeInstance.getDescriptor();
-// long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-// Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-// RowKeyEncoder rowKeyEncoder =
-// (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
-// baseCuboid);
-//
-// return rowKeyEncoder.getHeaderLength();
-//
-// }
-//
-// public static void main(String[] args) throws Exception {
-// int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
-// System.exit(exitCode);
-// }
-// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
deleted file mode 100644
index 037c3d9..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Mapper;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionMapper extends Mapper<Text, Text, Text, LongWritable> {
-//
-// private int headerLength;
-//
-// private Text currentKey;
-// private long outputLong;
-// private Text outputKey;
-// private LongWritable outputValue;
-// private int columnPercentage;
-// private int allRowCount;
-//
-// @Override
-// protected void setup(Context context) throws IOException {
-// String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
-// this.columnPercentage = Integer.valueOf(percentStr).intValue();
-// if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
-// this.columnPercentage = 20;
-// }
-// String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
-// this.headerLength = Integer.valueOf(headerLenStr).intValue();
-//
-// currentKey = new Text();
-// outputLong = 0;
-// outputKey = new Text();
-// outputValue = new LongWritable(1);
-// allRowCount = 0;
-// }
-//
-// @Override
-// protected void cleanup(Context context) throws IOException, InterruptedException {
-// emit(context); // emit the last holding record
-//
-// byte[] zerokey = new byte[] { 0 };
-// outputKey.set(zerokey);
-// outputValue.set(allRowCount);
-// context.write(outputKey, outputValue);
-// }
-//
-// @Override
-// public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-// byte[] bytes = key.getBytes();
-// int columnLength = bytes.length - this.headerLength;
-// int columnPrefixLen = columnLength * this.columnPercentage / 100;
-// if (columnPrefixLen == 0 && columnLength > 0) {
-// columnPrefixLen = 1;
-// }
-// if (columnPrefixLen > 0) {
-// currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
-// } else {
-// currentKey.set(bytes);
-// }
-//
-// allRowCount++;
-//
-// if (outputKey.getLength() == 0) { // first record
-// outputKey.set(currentKey);
-// outputLong = 1;
-// } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
-// outputLong++;
-// } else { // the next key
-// emit(context);
-// outputKey.set(currentKey);
-// outputLong = 1;
-// }
-// }
-//
-// private void emit(Context context) throws IOException, InterruptedException {
-// if (outputLong == 0)
-// return;
-//
-// outputValue.set(outputLong);
-// context.write(outputKey, outputValue);
-// }
-// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
deleted file mode 100644
index e0edc20..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.util.StringUtils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-//
-// private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
-//
-// private LongWritable outputValue;
-// private boolean isTotalCount;
-// private long totalCount;
-// private int splitNumber;
-// private long splitQuota;
-// private long splitRemain;
-//
-// @Override
-// protected void setup(Context context) throws IOException, InterruptedException {
-// String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
-// splitNumber = Integer.valueOf(splitStr).intValue();
-// outputValue = new LongWritable();
-// isTotalCount = true;
-// totalCount = 0;
-// splitQuota = 0;
-// splitRemain = 0;
-// }
-//
-// @Override
-// protected void cleanup(Context context) throws IOException, InterruptedException {
-// logger.info("---------------");
-// long splitCount = splitQuota - splitRemain;
-// logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
-// }
-//
-// @Override
-// public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
-// InterruptedException {
-//
-// // calculate split quota
-// if (isTotalCount) {
-// for (LongWritable count : values) {
-// totalCount += count.get();
-// }
-// splitQuota = totalCount / splitNumber;
-// splitRemain = splitQuota;
-// isTotalCount = false;
-// return;
-// }
-//
-// // output key when split quota is used up
-// for (LongWritable count : values) {
-// splitRemain -= count.get();
-// }
-// if (splitRemain <= 0) {
-// long splitCount = splitQuota - splitRemain;
-// String hexKey = StringUtils.byteToHexString(key.getBytes());
-// logger.info(hexKey + "\t\t" + splitCount);
-//
-// outputValue.set(splitCount);
-// context.write(key, outputValue);
-// splitRemain = splitQuota;
-// }
-//
-// }
-// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
deleted file mode 100644
index d97e135..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- */
-public class MergeCuboidJob extends CuboidJob {
-
- private static final Logger log = LoggerFactory.getLogger(MergeCuboidJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- // CubeSegment cubeSeg = cubeMgr.findSegment(cube, segmentName);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- System.out.println("Starting: " + jobName);
- job = Job.getInstance(getConf(), jobName);
-
- // set job configuration - basic
- File JarFile = new File(config.getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(config.getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- // setJobJar(job);
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(MergeCuboidMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
-
- // Reducer - only one
- job.setReducerClass(CuboidReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- setReduceTaskNum(job, config, cubeName, 0);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
-}