You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:26 UTC
[18/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
new file mode 100644
index 0000000..e7dfb9a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsCombiner extends KylinReducer<LongWritable, Text, LongWritable, Text> {
+
+ private Text outputValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ if(key.get() >= 0) {
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ for (ByteArray value : set) {
+ outputValue.set(value.array(), value.offset(), value.length());
+ context.write(key, outputValue);
+ }
+ } else {
+ // for hll, each key only has one output, no need to do local combine;
+ outputValue.set(values.iterator().next().getBytes());
+ context.write(key, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
new file mode 100644
index 0000000..b51eb15
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class FactDistinctColumnsJob extends AbstractHadoopJob {
+ protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_STATISTICS_ENABLED);
+ options.addOption(OPTION_STATISTICS_OUTPUT);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
+ String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+ // ----------------------------------------------------------------------------
+ // add metadata to distributed cache
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+ log.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job);
+
+ setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ setupReducer(output);
+
+ // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ logger.error("error in FactDistinctColumnsJob", e);
+ printUsage(options);
+ throw e;
+ }
+
+ }
+
+ private void setupMapper(CubeSegment cubeSeg) throws IOException {
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+
+ job.setMapperClass(FactDistinctHiveColumnsMapper.class);
+ job.setCombinerClass(FactDistinctColumnsCombiner.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReducer(Path output) throws IOException {
+ job.setReducerClass(FactDistinctColumnsReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ job.setNumReduceTasks(1);
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ FactDistinctColumnsJob job = new FactDistinctColumnsJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
new file mode 100644
index 0000000..592c45d
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -0,0 +1,89 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
+
+ protected String cubeName;
+ protected CubeInstance cube;
+ protected CubeSegment cubeSeg;
+ protected CubeDesc cubeDesc;
+ protected long baseCuboidId;
+ protected List<TblColRef> columns;
+ protected ArrayList<Integer> factDictCols;
+ protected IMRTableInputFormat flatTableInputFormat;
+
+ protected LongWritable outputKey = new LongWritable();
+ protected Text outputValue = new Text();
+ protected int errorRecordCounter = 0;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ cubeDesc = cube.getDescriptor();
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
+
+ factDictCols = new ArrayList<Integer>();
+ RowKeyDesc rowKey = cubeDesc.getRowkey();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ for (int i = 0; i < columns.size(); i++) {
+ TblColRef col = columns.get(i);
+ if (!rowKey.isUseDictionary(col))
+ continue;
+
+ String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+ if (cubeDesc.getModel().isFactTable(scanTable)) {
+ factDictCols.add(i);
+ }
+ }
+
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ }
+
+ protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
+
+ System.err.println("Insane record: " + Arrays.toString(record));
+ ex.printStackTrace(System.err);
+
+ errorRecordCounter++;
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else if (ex instanceof RuntimeException)
+ throw (RuntimeException) ex;
+ else
+ throw new RuntimeException("", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
new file mode 100644
index 0000000..08140e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, NullWritable, Text> {
+
+ private List<TblColRef> columnList = new ArrayList<TblColRef>();
+ private boolean collectStatistics = false;
+ private String statisticsOutput = null;
+ private List<Long> baseCuboidRowCountInMappers;
+ protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+ protected long baseCuboidId;
+ protected CubeDesc cubeDesc;
+ private long totalRowsBeforeMerge = 0;
+ private int SAMPING_PERCENTAGE = 5;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+ columnList = baseCuboid.getColumns();
+ collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
+ statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
+
+ if (collectStatistics) {
+ baseCuboidRowCountInMappers = Lists.newArrayList();
+ cuboidHLLMap = Maps.newHashMap();
+ SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
+ }
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ if (key.get() >= 0) {
+ TblColRef col = columnList.get((int) key.get());
+
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
+
+ try {
+ for (ByteArray value : set) {
+ out.write(value.array(), value.offset(), value.length());
+ out.write('\n');
+ }
+ } finally {
+ out.close();
+ }
+ } else {
+ // for hll
+ long cuboidId = 0 - key.get();
+
+ for (Text value : values) {
+ HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+
+ //output the hll info;
+ if (collectStatistics) {
+ writeMapperAndCuboidStatistics(context); // for human check
+ writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
+ }
+ }
+
+ private void writeMapperAndCuboidStatistics(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+
+ try {
+ String msg;
+
+ List<Long> allCuboids = new ArrayList<Long>();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ msg = "Total cuboid number: \t" + allCuboids.size();
+ writeLine(out, msg);
+ msg = "Samping percentage: \t" + SAMPING_PERCENTAGE;
+ writeLine(out, msg);
+
+ writeLine(out, "The following statistics are collected based sampling data.");
+ for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+ if (baseCuboidRowCountInMappers.get(i) > 0) {
+ msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
+ writeLine(out, msg);
+ }
+ }
+
+ long grantTotal = 0;
+ for (long i : allCuboids) {
+ grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+ msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
+ writeLine(out, msg);
+ }
+
+ msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge;
+ writeLine(out, msg);
+
+ msg = "After merge, the cube has row count: \t " + grantTotal;
+ writeLine(out, msg);
+
+ if (grantTotal > 0) {
+ msg = "The compaction factor is: \t" + totalRowsBeforeMerge / grantTotal;
+ writeLine(out, msg);
+ }
+
+ } finally {
+ out.close();
+ }
+ }
+
+ private void writeLine(FSDataOutputStream out, String msg) throws IOException {
+ out.write(msg.getBytes());
+ out.write('\n');
+
+ }
+
+ public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
+ Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+ SequenceFile.Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
+ SequenceFile.Writer.valueClass(BytesWritable.class));
+
+ List<Long> allCuboids = new ArrayList<Long>();
+ allCuboids.addAll(cuboidHLLMap.keySet());
+ Collections.sort(allCuboids);
+
+ // persist the sample percentage with key 0
+ writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ try {
+ for (long i : allCuboids) {
+ valueBuf.clear();
+ cuboidHLLMap.get(i).writeRegisters(valueBuf);
+ valueBuf.flip();
+ writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
+ }
+ } finally {
+ writer.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
new file mode 100644
index 0000000..8ea7fd3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
+
+ private CubeJoinedFlatTableDesc intermediateTableDesc;
+
+ protected boolean collectStatistics = false;
+ protected CuboidScheduler cuboidScheduler = null;
+ protected int nRowKey;
+ private Integer[][] allCuboidsBitSet = null;
+ private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+ private Long[] cuboidIds;
+ private HashFunction hf = null;
+ private int rowCount = 0;
+ private int SAMPING_PERCENTAGE = 5;
+ private ByteArray[] row_hashcodes = null;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+ if (collectStatistics) {
+ SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
+ cuboidScheduler = new CuboidScheduler(cubeDesc);
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ List<Long> cuboidIdList = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
+ cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+ allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
+ for (int i = 0; i < cuboidIds.length; i++) {
+ allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+ }
+
+ hf = Hashing.murmur3_32();
+ row_hashcodes = new ByteArray[nRowKey];
+ for (int i = 0; i < nRowKey; i++) {
+ row_hashcodes[i] = new ByteArray();
+ }
+ }
+ }
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
+ allCuboids.add(cuboidId);
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] indice = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+ Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ for (Long childId : children) {
+ addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+ }
+ }
+
+ @Override
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+ try {
+ for (int i : factDictCols) {
+ outputKey.set((long) i);
+ String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+ if (fieldValue == null)
+ continue;
+ byte[] bytes = Bytes.toBytes(fieldValue);
+ outputValue.set(bytes, 0, bytes.length);
+ context.write(outputKey, outputValue);
+ }
+ } catch (Exception ex) {
+ handleErrorRecord(row, ex);
+ }
+
+ if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
+ putRowKeyToHLL(row);
+ }
+
+ if (rowCount++ == 100)
+ rowCount = 0;
+ }
+
+ private void putRowKeyToHLL(String[] row) {
+
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+ }
+
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ ByteBuffer hllBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ HyperLogLogPlusCounter hll;
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = allCuboidsHLL[i];
+ outputKey.set(0 - cuboidIds[i]);
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ context.write(outputKey, outputValue);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
new file mode 100644
index 0000000..266debe
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+
+ private IMRTableInputFormat flatTableInputFormat;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+ }
+
+ @Override
+ public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+
+
+ try {
+ //put a record into the shared bytesSplitter
+ String[] row = flatTableInputFormat.parseMapperInput(value);
+ bytesSplitter.setBuffers(convertUTF8Bytes(row));
+ //take care of the data in bytesSplitter
+ outputKV(context);
+
+ } catch (Exception ex) {
+ handleErrorRecord(bytesSplitter, ex);
+ }
+ }
+
+ private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+ byte[][] result = new byte[row.length][];
+ for (int i = 0; i < row.length; i++) {
+ result[i] = row[i].getBytes("UTF-8");
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
new file mode 100644
index 0000000..685243c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class InMemCuboidJob extends AbstractHadoopJob {
+
+ protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_JOB_FLOW_ID);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ long timeout = 1000*60*60L; // 1 hour
+ job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
+
+ // set input
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+
+ // set mapper
+ job.setMapperClass(InMemCuboidMapper.class);
+ job.setMapOutputKeyClass(ByteArrayWritable.class);
+ job.setMapOutputValueClass(ByteArrayWritable.class);
+
+ // set output
+ IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+ storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ InMemCuboidJob job = new InMemCuboidJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
new file mode 100644
index 0000000..e9340de
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -0,0 +1,122 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
+
+ private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment cubeSegment;
+ private IMRTableInputFormat flatTableInputFormat;
+
+ private int counter;
+ private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+ private Future<?> future;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+
+ Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (cubeDesc.getRowkey().isUseDictionary(col)) {
+ Dictionary<?> dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ logger.warn("Dictionary for " + col + " was not found.");
+ }
+
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+ }
+
+ DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+
+ }
+
+ @Override
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ // put each row to the queue
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+ List<String> rowAsList = Arrays.asList(row);
+
+ while (!future.isDone()) {
+ if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ logger.info("Totally handled " + counter + " records!");
+
+ while (!future.isDone()) {
+ if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+ break;
+ }
+ }
+
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+ }
+ queue.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
new file mode 100644
index 0000000..004eb17
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -0,0 +1,95 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
+
+ private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
+
+ private IMRStorageOutputFormat storageOutputFormat;
+ private MeasureCodec codec;
+ private MeasureAggregators aggs;
+
+ private int counter;
+ private Object[] input;
+ private Object[] result;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
+
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ if (isMerge)
+ storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+ else
+ storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+
+ codec = new MeasureCodec(measuresDescs);
+ aggs = new MeasureAggregators(measuresDescs);
+
+ input = new Object[measuresDescs.size()];
+ result = new Object[measuresDescs.size()];
+ }
+
+ @Override
+ public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+
+ aggs.reset();
+
+ for (ByteArrayWritable value : values) {
+ codec.decode(value.asBuffer(), input);
+ aggs.aggregate(input);
+ }
+ aggs.collectStates(result);
+
+ storageOutputFormat.doReducerOutput(key, result, context);
+
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
new file mode 100644
index 0000000..686f2b2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import org.apache.commons.cli.Options;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+//import org.apache.hadoop.util.ToolRunner;
+//
+//import org.apache.kylin.cube.CubeInstance;
+//import org.apache.kylin.cube.CubeManager;
+//import org.apache.kylin.cube.cuboid.Cuboid;
+//import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+//import org.apache.kylin.cube.kv.RowKeyEncoder;
+//import org.apache.kylin.index.AbstractHadoopJob;
+//import org.apache.kylin.metadata.model.cube.CubeDesc;
+//
+///**
+// * @author xjiang
+// *
+// */
+//
+//public class KeyDistributionJob extends AbstractHadoopJob {
+//
+// public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
+// public static final String KEY_HEADER_LENGTH = "key_header_length";
+// public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
+// public static final String KEY_SPLIT_NUMBER = "key_split_number";
+//
+// /* (non-Javadoc)
+// * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+// */
+// @Override
+// public int run(String[] args) throws Exception {
+// Options options = new Options();
+//
+// try {
+// options.addOption(OPTION_INPUT_PATH);
+// options.addOption(OPTION_OUTPUT_PATH);
+// options.addOption(OPTION_METADATA_URL);
+// options.addOption(OPTION_CUBE_NAME);
+// options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
+// options.addOption(OPTION_KEY_SPLIT_NUMBER);
+// parseOptions(options, args);
+//
+// // start job
+// String jobName = JOB_TITLE + getOptionsAsString();
+// System.out.println("Starting: " + jobName);
+// Job job = Job.getInstanceFromEnv(getConf(), jobName);
+//
+// // set job configuration - basic
+// setJobClasspath(job);
+// addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+//
+// Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+// FileOutputFormat.setOutputPath(job, output);
+// //job.getConfiguration().set("dfs.block.size", "67108864");
+//
+// // set job configuration - key prefix size & key split number
+// String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
+// job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
+// String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
+// String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+// int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
+// job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
+// job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
+//
+// // Mapper
+// job.setInputFormatClass(SequenceFileInputFormat.class);
+// job.setMapperClass(KeyDistributionMapper.class);
+// job.setMapOutputKeyClass(Text.class);
+// job.setMapOutputValueClass(LongWritable.class);
+//
+// // Combiner, not needed any more as mapper now does the groping
+// //job.setCombinerClass(KeyDistributionCombiner.class);
+//
+// // Reducer - only one
+// job.setReducerClass(KeyDistributionReducer.class);
+// // use sequence file as output
+// job.setOutputFormatClass(SequenceFileOutputFormat.class);
+// // key is text
+// job.setOutputKeyClass(Text.class);
+// // value is long
+// job.setOutputValueClass(LongWritable.class);
+// job.setNumReduceTasks(1);
+//
+// FileSystem fs = FileSystem.get(job.getConfiguration());
+// if (fs.exists(output))
+// fs.delete(output, true);
+//
+// return waitForCompletion(job);
+// } catch (Exception e) {
+// printUsage(options);
+// e.printStackTrace(System.err);
+// return 2;
+// }
+// }
+//
+// private int getKeyHeaderLength(String metadataUrl, String cubeName) {
+// CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
+// CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+// CubeDesc cubeDesc = cubeInstance.getDescriptor();
+// long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+// Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+// RowKeyEncoder rowKeyEncoder =
+// (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
+// baseCuboid);
+//
+// return rowKeyEncoder.getHeaderLength();
+//
+// }
+//
+// public static void main(String[] args) throws Exception {
+// int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
+// System.exit(exitCode);
+// }
+// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
new file mode 100644
index 0000000..e08a56b
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import java.io.IOException;
+//
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Mapper;
+//
+///**
+// * @author xjiang
+// *
+// */
+//public class KeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
+//
+// private int headerLength;
+//
+// private Text currentKey;
+// private long outputLong;
+// private Text outputKey;
+// private LongWritable outputValue;
+// private int columnPercentage;
+// private int allRowCount;
+//
+// @Override
+// protected void setup(Context context) throws IOException {
+//super.publishConfiguration(context.getConfiguration());
+
+// String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
+// this.columnPercentage = Integer.valueOf(percentStr).intValue();
+// if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
+// this.columnPercentage = 20;
+// }
+// String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
+// this.headerLength = Integer.valueOf(headerLenStr).intValue();
+//
+// currentKey = new Text();
+// outputLong = 0;
+// outputKey = new Text();
+// outputValue = new LongWritable(1);
+// allRowCount = 0;
+// }
+//
+// @Override
+// protected void cleanup(Context context) throws IOException, InterruptedException {
+// emit(context); // emit the last holding record
+//
+// byte[] zerokey = new byte[] { 0 };
+// outputKey.set(zerokey);
+// outputValue.set(allRowCount);
+// context.write(outputKey, outputValue);
+// }
+//
+// @Override
+// public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+// byte[] bytes = key.getBytes();
+// int columnLength = bytes.length - this.headerLength;
+// int columnPrefixLen = columnLength * this.columnPercentage / 100;
+// if (columnPrefixLen == 0 && columnLength > 0) {
+// columnPrefixLen = 1;
+// }
+// if (columnPrefixLen > 0) {
+// currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
+// } else {
+// currentKey.set(bytes);
+// }
+//
+// allRowCount++;
+//
+// if (outputKey.getLength() == 0) { // first record
+// outputKey.set(currentKey);
+// outputLong = 1;
+// } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
+// outputLong++;
+// } else { // the next key
+// emit(context);
+// outputKey.set(currentKey);
+// outputLong = 1;
+// }
+// }
+//
+// private void emit(Context context) throws IOException, InterruptedException {
+// if (outputLong == 0)
+// return;
+//
+// outputValue.set(outputLong);
+// context.write(outputKey, outputValue);
+// }
+// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
new file mode 100644
index 0000000..ed3f966
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import java.io.IOException;
+//
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Reducer;
+//import org.apache.hadoop.util.StringUtils;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+///**
+// * @author xjiang
+// *
+// */
+//public class KeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
+//
+// private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
+//
+// private LongWritable outputValue;
+// private boolean isTotalCount;
+// private long totalCount;
+// private int splitNumber;
+// private long splitQuota;
+// private long splitRemain;
+//
+// @Override
+// protected void setup(Context context) throws IOException, InterruptedException {
+// super.publishConfiguration(context.getConfiguration());
+
+// String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
+// splitNumber = Integer.valueOf(splitStr).intValue();
+// outputValue = new LongWritable();
+// isTotalCount = true;
+// totalCount = 0;
+// splitQuota = 0;
+// splitRemain = 0;
+// }
+//
+// @Override
+// protected void cleanup(Context context) throws IOException, InterruptedException {
+// logger.info("---------------");
+// long splitCount = splitQuota - splitRemain;
+// logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
+// }
+//
+// @Override
+// public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
+// InterruptedException {
+//
+// // calculate split quota
+// if (isTotalCount) {
+// for (LongWritable count : values) {
+// totalCount += count.get();
+// }
+// splitQuota = totalCount / splitNumber;
+// splitRemain = splitQuota;
+// isTotalCount = false;
+// return;
+// }
+//
+// // output key when split quota is used up
+// for (LongWritable count : values) {
+// splitRemain -= count.get();
+// }
+// if (splitRemain <= 0) {
+// long splitCount = splitQuota - splitRemain;
+// String hexKey = StringUtils.byteToHexString(key.getBytes());
+// logger.info(hexKey + "\t\t" + splitCount);
+//
+// outputValue.set(splitCount);
+// context.write(key, outputValue);
+// splitRemain = splitQuota;
+// }
+//
+// }
+// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
new file mode 100644
index 0000000..4585dd1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -0,0 +1,96 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class MapContextGTRecordWriter implements ICuboidWriter {
+
+ private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
+ protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
+ private Long lastCuboidId;
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+
+ private int bytesLength;
+ private int dimensions;
+ private int measureCount;
+ private byte[] keyBuf;
+ private int[] measureColumnsIndex;
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+ private long cuboidRowCount = 0;
+
+ public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ this.mapContext = mapContext;
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.measureCount = cubeDesc.getMeasures().size();
+
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+
+ if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+ // output another cuboid
+ initVariables(cuboidId);
+ if (lastCuboidId != null) {
+ logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+ cuboidRowCount = 0;
+ }
+ }
+
+ cuboidRowCount++;
+ int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+ for (int x = 0; x < dimensions; x++) {
+ System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+ offSet += record.get(x).length();
+ }
+
+ //output measures
+ valueBuf.clear();
+ record.exportColumns(measureColumnsIndex, valueBuf);
+
+ outputKey.set(keyBuf, 0, offSet);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ try {
+ mapContext.write(outputKey, outputValue);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initVariables(Long cuboidId) {
+ bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ for (TblColRef column : cuboid.getColumns()) {
+ bytesLength += cubeSegment.getColumnLength(column);
+ }
+
+ keyBuf = new byte[bytesLength];
+ dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+ measureColumnsIndex = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureColumnsIndex[i] = dimensions + i;
+ }
+
+ System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
new file mode 100644
index 0000000..6525d93
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ */
+public class MergeCuboidFromStorageJob extends CuboidJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_JOB_FLOW_ID);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ Configuration conf = this.getConf();
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ System.out.println("Starting: " + jobName);
+ job = Job.getInstance(conf, jobName);
+
+ setJobClasspath(job);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
+
+ // configure mapper input
+ IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
+ storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
+
+ // configure reducer output
+ IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+ storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in MergeCuboidFromHBaseJob", e);
+ printUsage(options);
+ throw e;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
new file mode 100644
index 0000000..2281432
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
+
+ private KylinConfig config;
+ private String cubeName;
+ private String segmentName;
+ private CubeManager cubeManager;
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment mergedCubeSegment;
+ private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
+ private IMRStorageInputFormat storageInputFormat;
+
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private byte[] newKeyBuf;
+ private RowKeySplitter rowKeySplitter;
+
+ private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private MeasureCodec codec;
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dictsNeedMerging.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col);
+ if (ret) {
+ String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+ ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+ }
+ dictsNeedMerging.put(col, ret);
+ return ret;
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+ cubeManager = CubeManager.getInstance(config);
+ cube = cubeManager.getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
+
+ newKeyBuf = new byte[256];// size will auto-grow
+
+ sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+ logger.info(sourceCubeSegment.toString());
+
+ this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ codec = new MeasureCodec(measuresDescs);
+ }
+
+ @Override
+ public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
+ Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
+ ByteArrayWritable key = pair.getFirst();
+ Object[] value = pair.getSecond();
+
+ Preconditions.checkState(key.offset() == 0);
+
+ long cuboidID = rowKeySplitter.split(key.array(), key.length());
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+ SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+ int bufOffset = 0;
+ BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+ TblColRef col = cuboid.getColumns().get(i);
+
+ if (this.checkNeedMerging(col)) {
+ // if dictionary on fact table column, needs rewrite
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+ while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+ int idInMergedDict;
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ }
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+ bufOffset += mergedDict.getSizeOfId();
+ } else {
+ // keep as it is
+ while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+ bufOffset += splittedByteses[i + 1].length;
+ }
+ }
+ byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+ outputKey.set(newKey, 0, newKey.length);
+
+ valueBuf.clear();
+ codec.encode(value, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
new file mode 100644
index 0000000..ceebfc1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author ysong1
+ */
+public class MergeCuboidJob extends CuboidJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ System.out.println("Starting: " + jobName);
+ job = Job.getInstance(getConf(), jobName);
+
+ setJobClasspath(job);
+
+ // set inputs
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+
+ // Mapper
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(MergeCuboidMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ // Reducer - only one
+ job.setReducerClass(CuboidReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ setReduceTaskNum(job, config, cubeName, 0);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in MergeCuboidJob", e);
+ printUsage(options);
+ throw e;
+ }
+ }
+
+}