You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/09 10:45:29 UTC
incubator-kylin git commit: KYLIN-1007 Use CubeDesc.getMeasures() as
the order of measures on GridTable, as well as on MR KV
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1007 [created] 5c7e1f1b2
KYLIN-1007 Use CubeDesc.getMeasures() as the order of measures on
GridTable, as well as on MR KV
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5c7e1f1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5c7e1f1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5c7e1f1b
Branch: refs/heads/KYLIN-1007
Commit: 5c7e1f1b22cba2ad617f6ec32923d88839bbda3e
Parents: 2528cbf
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Sep 7 18:00:07 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Sep 9 16:18:14 2015 +0800
----------------------------------------------------------------------
.../gridtable/CuboidToGridTableMapping.java | 66 ++++++++++--------
.../cube/inmemcubing/DoggedCubeBuilder.java | 7 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 33 ++-------
.../org/apache/kylin/cube/model/CubeDesc.java | 17 +++--
.../kylin/cube/model/HBaseColumnDesc.java | 37 +++++++---
.../org/apache/kylin/engine/mr/IMROutput.java | 8 +--
.../org/apache/kylin/engine/mr/IMROutput2.java | 22 ++++++
.../engine/mr/steps/InMemCuboidReducer.java | 15 +----
.../mr/steps/MergeCuboidFromStorageMapper.java | 21 ++----
.../engine/mr/steps/MergeCuboidMapper.java | 3 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 4 +-
.../hbase/cube/v1/CubeTupleConverter.java | 2 +-
.../storage/hbase/steps/CubeHFileMapper.java | 3 +-
.../storage/hbase/steps/HBaseCuboidWriter.java | 18 ++---
.../storage/hbase/steps/HBaseMROutput2.java | 19 ++----
.../hbase/steps/InMemKeyValueCreator.java | 71 --------------------
.../storage/hbase/steps/KeyValueCreator.java | 32 ++-------
.../storage/hbase/steps/RowValueDecoder.java | 38 +++++++----
.../hbase/steps/RowValueDecoderTest.java | 6 +-
19 files changed, 167 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index ab4efca..12eb5f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -1,5 +1,6 @@
package org.apache.kylin.cube.gridtable;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
@@ -59,29 +60,48 @@ public class CuboidToGridTableMapping {
nDimensions = gtColIdx;
assert nDimensions == cuboid.getColumns().size();
+ // column blocks of metrics
+ ArrayList<BitSet> metricsColBlocks = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+ for (int i = 0; i < familyDesc.getColumns().length; i++) {
+ metricsColBlocks.add(new BitSet());
+ }
+ }
+
// metrics
metrics2gt = LinkedListMultimap.create();
- for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- BitSet colBlock = new BitSet();
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- // Count distinct & holistic count distinct are equals() but different.
- // Ensure the holistic version if exists is always the first.
- FunctionDesc func = measure.getFunction();
- if (func.isHolisticCountDistinct()) {
- List<Integer> existing = metrics2gt.removeAll(func);
- metrics2gt.put(func, gtColIdx);
- metrics2gt.putAll(func, existing);
- } else {
- metrics2gt.put(func, gtColIdx);
+ for (MeasureDesc measure :cuboid.getCube().getMeasures()) {
+ // Count distinct & holistic count distinct are equals() but different.
+ // Ensure the holistic version if exists is always the first.
+ FunctionDesc func = measure.getFunction();
+ if (func.isHolisticCountDistinct()) {
+ List<Integer> existing = metrics2gt.removeAll(func);
+ metrics2gt.put(func, gtColIdx);
+ metrics2gt.putAll(func, existing);
+ } else {
+ metrics2gt.put(func, gtColIdx);
+ }
+
+ gtDataTypes.add(func.getReturnDataType());
+
+ // map to column block
+ int cbIdx = 0;
+ for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ if (hbaseColDesc.containsMeasure(measure.getName())) {
+ metricsColBlocks.get(cbIdx).set(gtColIdx);
}
- gtDataTypes.add(func.getReturnDataType());
- colBlock.set(gtColIdx);
- gtColIdx++;
+ cbIdx++;
}
- gtColBlocks.add(new ImmutableBitSet(colBlock));
}
+
+ gtColIdx++;
}
+
+ for (BitSet set : metricsColBlocks) {
+ gtColBlocks.add(new ImmutableBitSet(set));
+ }
+
nMetrics = gtColIdx - nDimensions;
assert nMetrics == cuboid.getCube().getMeasures().size();
}
@@ -157,16 +177,4 @@ public class CuboidToGridTableMapping {
return result.isEmpty() ? Collections.<Integer, Integer> emptyMap() : result;
}
- public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
- MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
- int i = 0;
- for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc m : hbaseColDesc.getMeasures()) {
- result[i++] = m;
- }
- }
- }
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 30e1e95..d565ab5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -32,14 +32,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -286,9 +284,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
ImmutableBitSet lastMetricsColumns;
Merger() {
- MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
- reuseAggrs = new MeasureAggregators(measures);
- reuseMetricsArray = new Object[measures.length];
+ reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+ reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
}
public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 465dd99..662416e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
@@ -75,7 +73,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
- private final int[] hbaseMeasureRefIndex;
private final MeasureDesc[] measureDescs;
private final int measureCount;
@@ -97,35 +94,18 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ this.measureCount = cubeDesc.getMeasures().size();
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+
Map<String, Integer> measureIndexMap = Maps.newHashMap();
List<String> metricsAggrFuncsList = Lists.newArrayList();
- measureCount = cubeDesc.getMeasures().size();
-
- List<MeasureDesc> measureDescsList = Lists.newArrayList();
- hbaseMeasureRefIndex = new int[measureCount];
- int measureRef = 0;
- for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- for (int j = 0; j < measureCount; j++) {
- if (cubeDesc.getMeasures().get(j).equals(measure)) {
- measureDescsList.add(measure);
- hbaseMeasureRefIndex[measureRef] = j;
- break;
- }
- }
- measureRef++;
- }
- }
- }
for (int i = 0; i < measureCount; i++) {
- MeasureDesc measureDesc = measureDescsList.get(i);
+ MeasureDesc measureDesc = measureDescs[i];
metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
measureIndexMap.put(measureDesc.getName(), i);
}
this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
- this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
}
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
@@ -605,8 +585,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
Object[] values = new Object[measureCount];
MeasureDesc measureDesc = null;
- for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
- int i = hbaseMeasureRefIndex[position];
+ for (int i = 0; i < measureCount; i++) {
measureDesc = measureDescs[i];
Object value = null;
@@ -635,7 +614,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
value = measureCodec.getSerializer(i).valueOf(result);
}
- values[position] = value;
+ values[i] = value;
}
return values;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index ae49eb0..a273975 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -646,17 +646,24 @@ public class CubeDesc extends RootPersistentEntity {
if (measures == null || measures.size() == 0)
return;
- Map<String, MeasureDesc> measureCache = new HashMap<String, MeasureDesc>();
+ Map<String, MeasureDesc> measureLookup = new HashMap<String, MeasureDesc>();
for (MeasureDesc m : measures)
- measureCache.put(m.getName(), m);
+ measureLookup.put(m.getName(), m);
+ Map<String, Integer> measureIndexLookup = new HashMap<String, Integer>();
+ for (int i = 0; i < measures.size(); i++)
+ measureIndexLookup.put(measures.get(i).getName(), i);
for (HBaseColumnFamilyDesc cf : getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc c : cf.getColumns()) {
- MeasureDesc[] measureDescs = new MeasureDesc[c.getMeasureRefs().length];
- for (int i = 0; i < c.getMeasureRefs().length; i++) {
- measureDescs[i] = measureCache.get(c.getMeasureRefs()[i]);
+ String[] colMeasureRefs = c.getMeasureRefs();
+ MeasureDesc[] measureDescs = new MeasureDesc[colMeasureRefs.length];
+ int[] measureIndex = new int[colMeasureRefs.length];
+ for (int i = 0; i < colMeasureRefs.length; i++) {
+ measureDescs[i] = measureLookup.get(colMeasureRefs[i]);
+ measureIndex[i] = measureIndexLookup.get(colMeasureRefs[i]);
}
c.setMeasures(measureDescs);
+ c.setMeasureIndex(measureIndex);
c.setColumnFamilyName(cf.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
index 9aa6727..e82ad53 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
@@ -37,8 +37,9 @@ public class HBaseColumnDesc {
@JsonProperty("measure_refs")
private String[] measureRefs;
- // these two will be assemble at runtime
+ // these two will be assembled at runtime
private MeasureDesc[] measures;
+ private int[] measureIndex; // the index on CubeDesc.getMeasures()
private String columnFamilyName;
public String getQualifier() {
@@ -56,20 +57,19 @@ public class HBaseColumnDesc {
public void setMeasureRefs(String[] measureRefs) {
this.measureRefs = measureRefs;
}
+
+ public int[] getMeasureIndex() {
+ return measureIndex;
+ }
+
+ public void setMeasureIndex(int[] index) {
+ this.measureIndex = index;
+ }
public MeasureDesc[] getMeasures() {
return measures;
}
- public int findMeasureIndex(FunctionDesc function) {
- for (int i = 0; i < measures.length; i++) {
- if (measures[i].getFunction().equals(function)) {
- return i;
- }
- }
- return -1;
- }
-
public void setMeasures(MeasureDesc[] measures) {
this.measures = measures;
}
@@ -82,6 +82,23 @@ public class HBaseColumnDesc {
this.columnFamilyName = columnFamilyName;
}
+ public int findMeasure(FunctionDesc function) {
+ for (int i = 0; i < measures.length; i++) {
+ if (measures[i].getFunction().equals(function)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public boolean containsMeasure(String refName) {
+ for (String ref : measureRefs) {
+ if (ref.equals(refName))
+ return true;
+ }
+ return false;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index e6282f7..577a836 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -40,8 +40,8 @@ public interface IMROutput {
/**
* Add step that saves cuboid output from HDFS to storage.
*
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
* dictionary encoding; Mx is measure value serialization form.
*/
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
@@ -66,8 +66,8 @@ public interface IMROutput {
/**
* Add step that saves cuboid output from HDFS to storage.
*
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
* dictionary encoding; Mx is measure value serialization form.
*/
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 456df82..b4aff5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -27,6 +27,7 @@ public interface IMROutput2 {
*/
public interface IMRBatchCubingOutputSide2 {
+ /** Return an output format for Phase 3: Build Cube MR */
public IMRStorageOutputFormat getStorageOutputFormat();
/** Add step that executes after build dictionary and before build cube. */
@@ -45,13 +46,22 @@ public interface IMROutput2 {
public IMRStorageInputFormat getStorageInputFormat();
}
+ /** Read in a cube as input of merge. Configure the input file format of mapper. */
@SuppressWarnings("rawtypes")
public interface IMRStorageInputFormat {
+ /** Configure MR mapper class and input file format. */
public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+ /** Given a mapper context, figure out which segment the mapper reads from. */
public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+ /**
+ * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+ *
+ * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
+ * <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+ */
public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
}
@@ -67,6 +77,7 @@ public interface IMROutput2 {
*/
public interface IMRBatchMergeOutputSide2 {
+ /** Return an input format for Phase 2: Merge Cube MR */
public IMRStorageOutputFormat getStorageOutputFormat();
/** Add step that executes after merge dictionary and before merge cube. */
@@ -79,10 +90,21 @@ public interface IMROutput2 {
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
}
+ /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
@SuppressWarnings("rawtypes")
public interface IMRStorageOutputFormat {
+
+ /** Configure MR reducer class and output file format. */
public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+ /**
+ * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
+ * way and output to reducer context.
+ *
+ * @param key The cuboid ID (8 bytes) + dimension values in dictionary encoding
+ * @param value The measure values in order of <code>CubeDesc.getMeasures()</code>
+ * @param context The reducer context output goes to
+ */
public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 443ecd8..db254f6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -8,8 +8,6 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
import org.apache.kylin.engine.mr.KylinReducer;
@@ -23,8 +21,6 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
/**
*/
public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
@@ -56,18 +52,9 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
else
storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
-
+ List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
codec = new MeasureCodec(measuresDescs);
aggs = new MeasureAggregators(measuresDescs);
-
input = new Object[measuresDescs.size()];
result = new Object[measuresDescs.size()];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b57e484..fa575ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
@@ -35,8 +34,6 @@ import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -46,14 +43,12 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
/**
* @author shaoshi
@@ -111,22 +106,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
- newKeyBuf = new byte[256];// size will auto-grow
+ newKeyBuf = new byte[256]; // size will auto-grow
sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
- logger.info(sourceCubeSegment.toString());
+ logger.info("Source cube segment: " + sourceCubeSegment);
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
- codec = new MeasureCodec(measuresDescs);
+ codec = new MeasureCodec(cubeDesc.getMeasures());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5327dc4..a09ffe1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -57,8 +57,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private CubeInstance cube;
private CubeDesc cubeDesc;
private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
- // life cycle
+ private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
private Text outputKey = new Text();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 0514b45..836f142 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -373,7 +373,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
int bestIndex = -1;
for (HBaseColumnDesc hbCol : hbCols) {
bestHBCol = hbCol;
- bestIndex = hbCol.findMeasureIndex(aggrFunc);
+ bestIndex = hbCol.findMeasure(aggrFunc);
MeasureDesc measure = hbCol.getMeasures()[bestIndex];
// criteria for holistic measure: Exact Aggregation && Exact Cuboid
if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
@@ -387,7 +387,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
codec = new RowValueDecoder(bestHBCol);
codecMap.put(bestHBCol, codec);
}
- codec.setIndex(bestIndex);
+ codec.setProjectIndex(bestIndex);
}
return new ArrayList<RowValueDecoder>(codecMap.values());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index 27f4f10..e569cbd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -118,7 +118,7 @@ public class CubeTupleConverter {
// measures
for (int i = 0; i < rowValueDecoders.size(); i++) {
RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
- rowValueDecoder.decode(hbaseRow);
+ rowValueDecoder.decodeAndConvertJavaObj(hbaseRow);
Object[] measureValues = rowValueDecoder.getValues();
int[] measureIdx = metricsMeasureIdx[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 65b56b9..9f97b0e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -79,8 +79,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
KeyValue outputValue;
int n = keyValueCreators.size();
- if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
- // simple full copy
+ if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy
outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
context.write(outputKey, outputValue);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index d313233..1271070 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -64,26 +63,26 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
private static final int BATCH_PUT_THRESHOLD = 10000;
- private final List<InMemKeyValueCreator> keyValueCreators;
+ private final List<KeyValueCreator> keyValueCreators;
private final int nColumns;
private final HTableInterface hTable;
private final ByteBuffer byteBuffer;
private final CubeDesc cubeDesc;
+ private final Object[] measureValues;
private List<Put> puts = Lists.newArrayList();
public HBaseCuboidWriter(CubeDesc cubeDesc, HTableInterface hTable) {
this.keyValueCreators = Lists.newArrayList();
this.cubeDesc = cubeDesc;
- int startPosition = 0;
for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
- startPosition += colDesc.getMeasures().length;
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
}
}
this.nColumns = keyValueCreators.size();
this.hTable = hTable;
this.byteBuffer = ByteBuffer.allocate(1 << 20);
+ this.measureValues = new Object[cubeDesc.getMeasures().size()];
}
private byte[] copy(byte[] array, int offset, int length) {
@@ -106,10 +105,13 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
@Override
public void write(long cuboidId, GTRecord record) throws IOException {
final ByteBuffer key = createKey(cuboidId, record);
- final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
- final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
+ final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ final int nDims = cuboid.getColumns().size();
+ final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
+
for (int i = 0; i < nColumns; i++) {
- final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+ final Object[] values = record.getValues(bitSet, measureValues);
+ final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), values);
final Put put = new Put(copy(key.array(), 0, key.position()));
byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 6cbcec3..1e414be 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -58,9 +58,6 @@ import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
-import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import com.google.common.collect.Lists;
@@ -196,14 +193,10 @@ public class HBaseMROutput2 implements IMROutput2 {
ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
parsedKey.set(key.get(), key.getOffset(), key.getLength());
- Result value = (Result) inValue;
- int position = 0;
+ Result hbaseRow = (Result) inValue;
for (int i = 0; i < rowValueDecoders.length; i++) {
- rowValueDecoders[i].decode(value, false);
- Object[] measureValues = rowValueDecoders[i].getValues();
- for (int j = 0; j < measureValues.length; j++) {
- parsedValue[position++] = measureValues[j];
- }
+ rowValueDecoders[i].decode(hbaseRow);
+ rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
}
return parsedPair;
@@ -214,7 +207,7 @@ public class HBaseMROutput2 implements IMROutput2 {
private static class HBaseOutputFormat implements IMRStorageOutputFormat {
final CubeSegment seg;
- final List<InMemKeyValueCreator> keyValueCreators = Lists.newArrayList();
+ final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
public HBaseOutputFormat(CubeSegment seg) {
@@ -243,11 +236,9 @@ public class HBaseMROutput2 implements IMROutput2 {
@Override
public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
if (keyValueCreators.size() == 0) {
- int startPosition = 0;
for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
- startPosition += colDesc.getMeasures().length;
+ keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
deleted file mode 100644
index d96dfcc..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.kylin.storage.hbase.steps;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-public class InMemKeyValueCreator {
- byte[] cfBytes;
- byte[] qBytes;
- long timestamp;
-
- MeasureCodec codec;
- Object[] colValues;
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- int startPosition = 0;
-
- public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
-
- cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
- qBytes = Bytes.toBytes(colDesc.getQualifier());
- timestamp = System.currentTimeMillis();
-
- List<MeasureDesc> measures = Lists.newArrayList();
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measures.add(measure);
- }
- codec = new MeasureCodec(measures);
- colValues = new Object[measures.size()];
-
- this.startPosition = startPosition;
-
- }
-
- public KeyValue create(Text key, Object[] measureValues) {
- return create(key.getBytes(), 0, key.getLength(), measureValues);
- }
-
- public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
- for (int i = 0; i < colValues.length; i++) {
- colValues[i] = measureValues[startPosition + i];
- }
-
- valueBuf.clear();
- codec.encode(colValues, valueBuf);
-
- return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
- }
-
- public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
- return new KeyValue(keyBytes, keyOffset, keyLength, //
- cfBytes, 0, cfBytes.length, //
- qBytes, 0, qBytes.length, //
- timestamp, KeyValue.Type.Put, //
- value, voffset, vlen);
- }
-
- public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
- return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index a24fe39..7f40259 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -1,7 +1,6 @@
package org.apache.kylin.storage.hbase.steps;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
@@ -36,21 +35,14 @@ public class KeyValueCreator {
qBytes = Bytes.toBytes(colDesc.getQualifier());
timestamp = System.currentTimeMillis();
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] measureNames = getMeasureNames(cubeDesc);
- String[] refs = colDesc.getMeasureRefs();
-
- refIndex = new int[refs.length];
- refMeasures = new MeasureDesc[refs.length];
- for (int i = 0; i < refs.length; i++) {
- refIndex[i] = indexOf(measureNames, refs[i]);
- refMeasures[i] = measures.get(refIndex[i]);
- }
+ refIndex = colDesc.getMeasureIndex();
+ refMeasures = colDesc.getMeasures();
codec = new MeasureCodec(refMeasures);
- colValues = new Object[refs.length];
+ colValues = new Object[refMeasures.length];
isFullCopy = true;
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
for (int i = 0; i < measures.size(); i++) {
if (refIndex.length <= i || refIndex[i] != i)
isFullCopy = false;
@@ -84,20 +76,4 @@ public class KeyValueCreator {
return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
}
- private int indexOf(String[] measureNames, String ref) {
- for (int i = 0; i < measureNames.length; i++)
- if (measureNames[i].equalsIgnoreCase(ref))
- return i;
-
- throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
- }
-
- private String[] getMeasureNames(CubeDesc cubeDesc) {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] result = new String[measures.size()];
- for (int i = 0; i < measures.size(); i++)
- result[i] = measures.get(i).getName();
- return result;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 9fd4e22..7c1a19f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -42,7 +42,7 @@ public class RowValueDecoder implements Cloneable {
private final MeasureCodec codec;
private final BitSet projectionIndex;
private final MeasureDesc[] measures;
- private Object[] values;
+ private final Object[] values;
public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
this.hbaseColumn = hbaseColumn;
@@ -54,22 +54,26 @@ public class RowValueDecoder implements Cloneable {
this.values = new Object[measures.length];
}
- public void decode(Result hbaseRow) {
+ public void decodeAndConvertJavaObj(Result hbaseRow) {
decode(hbaseRow, true);
}
+
+ public void decode(Result hbaseRow) {
+ decode(hbaseRow, false);
+ }
- public void decode(Result hbaseRow, boolean convertToJavaObject) {
+ private void decode(Result hbaseRow, boolean convertToJavaObject) {
decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
}
- public void decode(byte[] bytes) {
- decode(bytes, true);
+ public void decodeAndConvertJavaObj(byte[] bytes) {
+ decode(ByteBuffer.wrap(bytes), true);
}
- public void decode(byte[] bytes, boolean convertToJavaObject) {
- decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+ public void decode(byte[] bytes) {
+ decode(ByteBuffer.wrap(bytes), false);
}
-
+
private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
codec.decode(buffer, values);
if (convertToJavaObject) {
@@ -90,18 +94,18 @@ public class RowValueDecoder implements Cloneable {
}
}
- public void setIndex(int bitIndex) {
+ public void setProjectIndex(int bitIndex) {
projectionIndex.set(bitIndex);
}
- public HBaseColumnDesc getHBaseColumn() {
- return hbaseColumn;
- }
-
public BitSet getProjectionIndex() {
return projectionIndex;
}
+ public HBaseColumnDesc getHBaseColumn() {
+ return hbaseColumn;
+ }
+
public Object[] getValues() {
return values;
}
@@ -109,6 +113,14 @@ public class RowValueDecoder implements Cloneable {
public MeasureDesc[] getMeasures() {
return measures;
}
+
+ // result is in order of <code>CubeDesc.getMeasures()</code>
+ public void loadCubeMeasureArray(Object result[]) {
+ int[] measureIndex = hbaseColumn.getMeasureIndex();
+ for (int i = 0; i < measureIndex.length; i++) {
+ result[measureIndex[i]] = values[i];
+ }
+ }
public boolean hasMemHungryCountDistinct() {
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5c7e1f1b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
index d62da80..ba79305 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -73,11 +73,11 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase {
RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
for (MeasureDesc measure : cubeDesc.getMeasures()) {
FunctionDesc aggrFunc = measure.getFunction();
- int index = hbaseCol.findMeasureIndex(aggrFunc);
- rowValueDecoder.setIndex(index);
+ int index = hbaseCol.findMeasure(aggrFunc);
+ rowValueDecoder.setProjectIndex(index);
}
- rowValueDecoder.decode(valueBytes);
+ rowValueDecoder.decodeAndConvertJavaObj(valueBytes);
Object[] measureValues = rowValueDecoder.getValues();
//BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
assertEquals("[333.1235, 333.1111, 333.2000, 2, 100]", Arrays.toString(measureValues));