You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:36 UTC
[24/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube
segments from HBase table
KYLIN-750 Merge cube segments from HBase table
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/39ada70f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/39ada70f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/39ada70f
Branch: refs/heads/streaming-localdict
Commit: 39ada70f4732203fa599a1cf323f6ddecd73a8ff
Parents: 32ceeee
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Thu May 7 15:05:29 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Thu May 7 17:25:56 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/cube/kv/RowValueDecoder.java | 34 ++-
.../apache/kylin/job/cube/CubingJobBuilder.java | 56 ++++-
.../hadoop/cubev2/MergeCuboidFromHBaseJob.java | 123 ++++++++++
.../cubev2/MergeCuboidFromHBaseMapper.java | 246 +++++++++++++++++++
.../kylin/metadata/model/MeasureDesc.java | 2 +-
5 files changed, 440 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
index ace347a..99c2485 100644
--- a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
+++ b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
@@ -18,10 +18,6 @@
package org.apache.kylin.cube.kv;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Collection;
-
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
@@ -33,17 +29,19 @@ import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
/**
- *
* @author xjiang
- *
*/
public class RowValueDecoder implements Cloneable {
private final HBaseColumnDesc hbaseColumn;
private final byte[] hbaseColumnFamily;
private final byte[] hbaseColumnQualifier;
-
+
private final MeasureCodec codec;
private final BitSet projectionIndex;
private final MeasureDesc[] measures;
@@ -60,19 +58,29 @@ public class RowValueDecoder implements Cloneable {
}
public void decode(Result hbaseRow) {
- decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier));
+ decode(hbaseRow, true);
+ }
+
+ public void decode(Result hbaseRow, boolean convertToJavaObject) {
+ decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
}
-
+
public void decode(byte[] bytes) {
- decode(ByteBuffer.wrap(bytes));
+ decode(bytes, true);
+ }
+
+ public void decode(byte[] bytes, boolean convertToJavaObject) {
+ decode(ByteBuffer.wrap(bytes), convertToJavaObject);
}
- private void decode(ByteBuffer buffer) {
+ private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
codec.decode(buffer, values);
- convertToJavaObjects(values, values);
+ if (convertToJavaObject) {
+ convertToJavaObjects(values, values, convertToJavaObject);
+ }
}
- private void convertToJavaObjects(Object[] mapredObjs, Object[] results) {
+ private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
for (int i = 0; i < mapredObjs.length; i++) {
Object o = mapredObjs[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 27613df..e4a0fae 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -32,6 +32,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.hadoop.cube.*;
import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
@@ -69,8 +70,6 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
if (this.inMemoryCubing()) {
- // bulk load step
- result.addTask(createBulkLoadStep(seg, result.getId()));
result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, baseCuboidStepId, jobId));
} else {
// convert htable
@@ -103,16 +102,24 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
List<String> mergingSegmentIds = Lists.newArrayList();
List<String> mergingCuboidPaths = Lists.newArrayList();
+ List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
+ mergingHTables.add(merging.getStorageLocationIdentifier());
if (merging.equals(appendSegment))
mergingCuboidPaths.add(appendRootPath + "*");
else
mergingCuboidPaths.add(getPathToMerge(merging));
}
- // merge cuboid
- addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+ if(this.inMemoryCubing()) {
+ // merge from HTable
+ addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+ } else {
+ // merge cuboid
+ addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+ }
+
// convert htable
AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
@@ -134,14 +141,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
List<String> mergingSegmentIds = Lists.newArrayList();
List<String> mergingCuboidPaths = Lists.newArrayList();
+ List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingCuboidPaths.add(getPathToMerge(merging));
+ mergingHTables.add(merging.getStorageLocationIdentifier());
}
- // merge cuboid
- addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
-
+ if(this.inMemoryCubing()) {
+ // merge from HTable
+ addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+ } else {
+ // merge cuboid
+ addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
+ }
// convert htable
AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
@@ -159,6 +172,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
}
+
+ void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+
+ result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
+
+ String formattedTables = StringUtils.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, mergedCuboidPath));
+ }
+
Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
@@ -190,6 +212,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createCreateHTableStep(seg, jobId));
baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
result.addTask(baseCuboidStep);
+ // bulk load step
+ result.addTask(createBulkLoadStep(seg, result.getId()));
}
return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
@@ -480,6 +504,24 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
return mergeCuboidDataStep;
}
+
+ private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String outputPath) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", outputPath);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+ return mergeCuboidDataStep;
+ }
+
private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
new file mode 100644
index 0000000..07d16f8
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.CuboidJob;
+import org.apache.kylin.job.hadoop.cube.CuboidReducer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromHBaseJob extends CuboidJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+
+ Configuration conf = this.getConf();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ System.out.println("Starting: " + jobName);
+ job = Job.getInstance(conf, jobName);
+
+ setJobClasspath(job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+
+
+ List<Scan> scans = new ArrayList<Scan>();
+ for (String htable : StringSplitter.split(getOptionValue(OPTION_INPUT_PATH), ",")) {
+ Scan scan = new Scan();
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+ scans.add(scan);
+ }
+
+ TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, Text.class,
+ Text.class, job);
+
+ // Reducer - only one
+ job.setReducerClass(CuboidReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ setReduceTaskNum(job, config, cubeName, 0);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (
+ Exception e
+ )
+
+ {
+ logger.error("error in MergeCuboidFromHBaseJob", e);
+ printUsage(options);
+ throw e;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
new file mode 100644
index 0000000..cc71351
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
+
+ private KylinConfig config;
+ private String cubeName;
+ private String segmentName;
+ private CubeManager cubeManager;
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment mergedCubeSegment;
+ private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
+ // life cycle
+
+ private Text outputKey = new Text();
+
+ private byte[] newKeyBuf;
+ private RowKeySplitter rowKeySplitter;
+
+ private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ private Text outputValue = new Text();
+ private RowValueDecoder[] rowValueDecoders;
+ private boolean simpleFullCopy = false;
+ private Object[] result;
+ private MeasureCodec codec;
+ private int[][] hbaseColumnToMeasureMapping;
+
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dictsNeedMerging.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col);
+ if (ret) {
+ String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+ ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+ }
+ dictsNeedMerging.put(col, ret);
+ return ret;
+ }
+ }
+
+
+ private CubeSegment findSegmentWithHTable(String htable, CubeInstance cubeInstance) {
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String segmentHtable = segment.getStorageLocationIdentifier();
+ if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htable)) {
+ return segment;
+ }
+ }
+
+ throw new IllegalStateException("No merging segment's storage location identifier equals " + htable);
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ HadoopUtil.setCurrentConfiguration(context.getConfiguration());
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+ config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeManager = CubeManager.getInstance(config);
+ cube = cubeManager.getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ newKeyBuf = new byte[256];// size will auto-grow
+
+ TableSplit currentSplit = (TableSplit) context.getInputSplit();
+ byte[] tableName = currentSplit.getTableName();
+ String htableName = Bytes.toString(tableName);
+ // decide which source segment
+ System.out.println("htable:" + htableName);
+ sourceCubeSegment = findSegmentWithHTable(htableName, cube);
+ System.out.println(sourceCubeSegment);
+
+ this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+ List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+ List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ valueDecoderList.add(new RowValueDecoder(colDesc));
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ }
+ }
+
+ rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+
+ simpleFullCopy = (keyValueCreators.size() == 1 && keyValueCreators.get(0).isFullCopy);
+ result = new Object[cubeDesc.getMeasures().size()];
+ codec = new MeasureCodec(cubeDesc.getMeasures());
+
+ hbaseColumnToMeasureMapping = new int[rowValueDecoders.length][];
+
+ for (int i = 0; i < rowValueDecoders.length; i++) {
+ hbaseColumnToMeasureMapping[i] = new int[rowValueDecoders[i].getMeasures().length];
+ for (int j = 0; j < rowValueDecoders[i].getMeasures().length; j++) {
+ int positionInCubeMeasures = 0;
+ for (MeasureDesc m : cubeDesc.getMeasures()) {
+ if (m.equals(rowValueDecoders[i].getMeasures()[j])) {
+ hbaseColumnToMeasureMapping[i][j] = positionInCubeMeasures;
+ break;
+ }
+ positionInCubeMeasures++;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+ long cuboidID = rowKeySplitter.split(key.get(), key.get().length);
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+ SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+ int bufOffset = 0;
+ BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+ TblColRef col = cuboid.getColumns().get(i);
+
+ if (this.checkNeedMerging(col)) {
+ // if dictionary on fact table column, needs rewrite
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+ while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+ int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+ bufOffset += mergedDict.getSizeOfId();
+ } else {
+ // keep as it is
+ while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+ bufOffset += splittedByteses[i + 1].length;
+ }
+ }
+ byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+ outputKey.set(newKey, 0, newKey.length);
+
+ valueBuf.clear();
+ if (simpleFullCopy) {
+ // simple case, should only 1 hbase column and the bytes sequence is same
+ for (Cell cell : value.rawCells()) {
+ valueBuf.put(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ }
+ } else {
+ // complex case, need re-arrange the bytes sequence
+ for (int i = 0; i < rowValueDecoders.length; i++) {
+ rowValueDecoders[i].decode(value, false);
+ Object[] measureValues = rowValueDecoders[i].getValues();
+
+ for (int j = 0; j < measureValues.length; j++) {
+ result[hbaseColumnToMeasureMapping[i][j]] = measureValues[j];
+ }
+ }
+ codec.encode(result, valueBuf);
+ }
+
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(outputKey, outputValue);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/39ada70f/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index fd42d45..ee4a972 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -80,7 +80,7 @@ public class MeasureDesc {
MeasureDesc that = (MeasureDesc) o;
- if (id != that.id)
+ if (!name.equalsIgnoreCase(that.getName()))
return false;
return true;